# Tests

## Create a stream in redis using Aia Utilities

In [3]:
from zoneinfo import ZoneInfo
import aia_utilities as au
from datetime import datetime
import pytz

ru = au.Redis_Utilities()
tm = au.Time_Management()

# stream = 'test_stream'
stream = 'prices'


## Populate it with n entries ordered or random timestamps

In [None]:
from time import sleep
from datetime import datetime, timedelta
import pytz  
import random

ordered = False
iterations = 100

for i in range(iterations):

    if ordered:
        timestamp = tm.datetime_to_string(datetime.now())
    else:
        timestamp = tm.datetime_to_string(random.uniform(datetime.now() + timedelta(minutes=5), datetime.now() - timedelta(minutes=5)))
    item1 = {"timestamp": timestamp, "price": 1.0 + i, "currency": "USD"}
    ru.write(stream, item1)

## Show the stream

In [2]:
sample = 100
ru.show(stream, sample)

The items are ordered by timestamp.
8 items, type <class 'list'>
Adjusted sample from 100 to 4 because stream is 8 items.
First 4:
{'timestamp': '2025-09-05 16:58:55.000000', 'instrument': 'USD_CAD', 'price': 1.382745, 'bid': 1.3825, 'ask': 1.38299, 'spread': 0.0004899999999998794}
{'timestamp': '2025-09-05 16:59:00.000000', 'instrument': 'USD_CAD', 'price': 1.3828, 'bid': 1.38258, 'ask': 1.38302, 'spread': 0.00043999999999999595}
{'timestamp': '2025-09-05 16:59:00.000000', 'instrument': 'WTICO_USD', 'price': 62.353, 'bid': 62.338, 'ask': 62.368, 'spread': 0.030000000000001137}
{'timestamp': '2025-09-05 16:59:05.000000', 'instrument': 'WTICO_USD', 'price': 62.358000000000004, 'bid': 62.338, 'ask': 62.378, 'spread': 0.03999999999999915}
Last 4:
{'timestamp': '2025-09-07 16:39:54.615521', 'instrument': 'USD_CAD', 'price': 1.3828, 'bid': 1.38258, 'ask': 1.38302, 'spread_pips': 4.4}
{'timestamp': '2025-09-07 16:39:54.619910', 'instrument': 'WTICO_USD', 'price': 62.358, 'bid': 62.338, 'ask'

## Read all entries and order them or not

In [8]:
ordered = False  # False

return_dict = ru.read_all(stream, order=ordered)

is_ordered = all(return_dict[i]["timestamp"] <= return_dict[i + 1]["timestamp"] for i in range(len(return_dict) - 1))
print(f"{len(return_dict)} items, ordered: {is_ordered}")

0 items, ordered: True


# Delete the stream

In [6]:
ru.delete(stream)

True

## Test read_each by inserting items randomly and reading them

In [None]:
# While you read each, also insert some entries once every 10 seconds, in a separate thread
import threading
from time import sleep    
import random

ru.delete('test_stream')
def insert_entries():
    # make the range 0-5 random
    while True:
        sleep(random.randint(0, 5))
        # create a now timestamp with 6 decimals for new york time
        now = datetime.now(pytz.timezone("America/New_York"))
        timestamp = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        item1 = {"timestamp": timestamp, "price": random.randint(0, 100), "currency": "USD"}
        ru.write(stream, item1)

# While you read each, also insert some entries once every 10 seconds, in a separate thread
thread = threading.Thread(target=insert_entries)
thread.start()

for entry in ru.read_each(stream):
    print(entry)