In [1]:
from security import Credentials
from db import FirestoreDB
from firebase_admin import initialize_app
from firebase_admin.credentials import Certificate
from concurrent.futures import ThreadPoolExecutor
from services import (
    service_provider,
    NLPService,
    EngineService,
    MarketDataService,
    NewsService,
)
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from nlp.models import NLPModel
from nlp.processing import (
    NLPPostProcessor,
    NLPTokenizer,
)
from pipelines import Pipeline
from engine import Engine
from multiprocessing.pool import Pool
from adapters import OHLCVAdapter
import math
import os


Trying to open credentials: [c:\Users\tomsr\Documents\School\aidi\student projects\crypto_bot\server/keys.json]


  from .autonotebook import tqdm as notebook_tqdm


Trying to open credentials: [c:\Users\tomsr\Documents\School\aidi\student projects\crypto_bot\server/keys.json]


In [2]:
 # -- App Providers --
app = initialize_app(Certificate(Credentials("firebase").load()))

# -- Storage --
# Preferably Firestore storage from firebase.
db = FirestoreDB(app)

# -- Threading --
threads = ThreadPoolExecutor(
    max_workers=int(math.floor(os.cpu_count() / 2)),
    thread_name_prefix="CTB",
)

# -- Multi Processing --
pool = Pool(processes=int(math.floor(os.cpu_count() / 2)))
service_provider.add(
    NLPService(
        model=NLPModel(
            model=AutoModelForSequenceClassification.from_pretrained(
                "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis"
            ),
            is_auto_model=True,
        ),
        tokenizer=NLPTokenizer(
            tokenizer=AutoTokenizer.from_pretrained(
                "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis"
            )
        ),
        postprocessor=NLPPostProcessor(
            chain=[
                "set",
            ]
        ),
    )
)
service_provider.add(NewsService())
service_provider.add(MarketDataService())
service_provider.add(EngineService(service_provider=service_provider))

Trying to open credentials: [c:\Users\tomsr\Documents\School\aidi\student projects\crypto_bot\server/keys.json]
Service added: [<services.nlp_service.NLPService object at 0x000002BF60E80290>]
Service added: [<services.news_service.NewsService object at 0x000002BF606C8210>]
Service added: [<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>]
Service added: [<services.engine_service.EngineService object at 0x000002BF60E80390>]


In [3]:
context = {
    "db": db,
    "threads": threads,
    "pool": pool,
    "service_provider": service_provider,
    "adapter": OHLCVAdapter(),
}

In [4]:
ts_pipeline_get_earliest = Pipeline(
    service_provider(MarketDataService).get_earliest,
    name="ts_pipeline_earliest_get",
)

ts_pipeline_fetch_earliest = Pipeline(
    service_provider(MarketDataService).fetch_earliest,
    name="ts_pipeline_earliest_fetch",
)

ts_pipeline_filter = Pipeline(
    service_provider(MarketDataService).filter,
    name="ts_pipeline_filter",
)

ts_pipeline_batch_earliest = Pipeline(
    service_provider(MarketDataService).batch_earliest,
    name="ts_pipeline_batch_earliest",
)

ts_pipeline_write = Pipeline(
    service_provider(MarketDataService).write,
    name="ts_pipeline_write",
)

engine = Engine(
    context=context,
    pipelines=[
        ts_pipeline_get_earliest,
        ts_pipeline_fetch_earliest,
        ts_pipeline_filter,
        ts_pipeline_batch_earliest,
        ts_pipeline_write,
    ],
)

In [5]:
engine.run_sequential()

[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running get_earliest function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running fetch_earliest function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running filter function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running batch_earliest function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running store function
pipelines.pipeline executed


{'db': <db.firestore.FirestoreDB at 0x2bf60e63510>,
 'threads': <concurrent.futures.thread.ThreadPoolExecutor at 0x2bf60747190>,
 'pool': <multiprocessing.pool.Pool state=RUN pool_size=8>,
 'service_provider': <services.service_provider.ServiceProvider at 0x2bf34845210>,
 'ohlcv_batches': [{'batch': '[["1709383500000", "62073.1", "62084", "62073", "62083.8", "1.6035683", "BTC-USDT"], ["1709383560000", "62083.9", "62124.2", "62083.8", "62124.2", "2.10688746", "BTC-USDT"], ["1709383620000", "62124.2", "62137.8", "62124.1", "62137.8", "4.43183962", "BTC-USDT"], ["1709383680000", "62137.8", "62158.6", "62116.7", "62158.5", "36.07805827", "BTC-USDT"], ["1709383740000", "62158.6", "62160", "62095.4", "62095.5", "6.92189948", "BTC-USDT"], ["1709383800000", "62095.5", "62125.9", "62095.5", "62119.8", "3.04341615", "BTC-USDT"], ["1709383860000", "62119.8", "62119.9", "62089.6", "62089.6", "2.7555549", "BTC-USDT"], ["1709383920000", "62089.7", "62089.7", "62056.5", "62059.1", "2.83634756", "BTC-

In [6]:
ts_pipeline_get_latest = Pipeline(
    service_provider(MarketDataService).get_latest,
    name="ts_pipeline_latest_get",
)

ts_pipeline_fetch_latest = Pipeline(
    service_provider(MarketDataService).fetch_latest,
    name="ts_pipeline_latest_fetch",
)


ts_pipeline_batch_latest = Pipeline(
    service_provider(MarketDataService).batch_latest,
    name="ts_pipeline_batch_latest",
)


engine = Engine(
    context=context,
    pipelines=[
        ts_pipeline_get_latest,
        ts_pipeline_fetch_latest,
        ts_pipeline_filter,
        ts_pipeline_batch_latest,
        ts_pipeline_write,
    ],
)

In [7]:
engine.run_sequential()

[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running get_earliest function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running fetch_latest function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running filter function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running batch_latest function
pipelines.pipeline executed
[<services.market_data_service.MarketDataService object at 0x000002BF60E7AD50>] Running store function
pipelines.pipeline executed


{'db': <db.firestore.FirestoreDB at 0x2bf60e63510>,
 'threads': <concurrent.futures.thread.ThreadPoolExecutor at 0x2bf60747190>,
 'pool': <multiprocessing.pool.Pool state=RUN pool_size=8>,
 'service_provider': <services.service_provider.ServiceProvider at 0x2bf34845210>,
 'ohlcv_batches': [{'batch': '[["1709387040000", "61943.3", "61966", "61943.3", "61964.7", "0.96178701", "BTC-USDT"], ["1709387100000", "61964.6", "61988.5", "61959", "61973.8", "3.71889179", "BTC-USDT"], ["1709387160000", "61970.1", "61988.5", "61970.1", "61983.8", "0.68766412", "BTC-USDT"], ["1709387220000", "61983.8", "61983.8", "61977.3", "61979.9", "0.62058056", "BTC-USDT"], ["1709387280000", "61979.9", "61979.9", "61950.1", "61950.2", "4.08994212", "BTC-USDT"], ["1709387340000", "61951.4", "61956.7", "61947.7", "61953.7", "1.28275603", "BTC-USDT"], ["1709387400000", "61953.7", "61953.8", "61950.1", "61950.3", "0.71299066", "BTC-USDT"], ["1709387460000", "61950.2", "61950.2", "61948.1", "61949", "0.72337932", "BTC