In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import signal
import sys
import sqlite3
import datetime

from pathlib import Path
from IPython.display import HTML, display
from tabulate import tabulate
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from denormalized import Context, DataStream
from denormalized.datafusion import col
from denormalized.datafusion import functions as f
from denormalized.datafusion import lit
from feast import FeatureStore
from feast.data_source import PushMode

from typing import List

In [45]:
try:
    repo_path = Path("./src/feast_test/feature_repo/")
    fs = FeatureStore(repo_path=str(repo_path.resolve()))
except Exception as e:
    print(e)


In [None]:
fs.push("push_sensor_statistics", df, to=PushMode.ONLINE)

In [8]:
entities = fs.list_entities()

In [75]:
feature_vector = fs.get_online_features(
    features=[
        "sensor_statistics:max",
        "sensor_statistics:min",
    ],
    entity_rows=[
        {"sensor_name": "sensor_1"},
        {"sensor_name": "sensor_2"},
    ],
).to_dict()
print(feature_vector)

{'sensor_name': ['sensor_1', 'sensor_2'], 'avg_2': [57.61820697710939, 55.67418363036297], 'max': [114.97685605369205, 114.95627792014], 'min': [0.14954122417143056, 0.01963995076755265]}


In [None]:
# # Specify the path to your SQLite database file
# with sqlite3.connect('./src/feast_test/feature_repo/data/online_store.db') as conn:
#     # cursor = conn.cursor()
#     # cursor.execute("SELECT * FROM denormalized_test_sensor_statistics;")
#     # rows = [r for r in cursor.fetchall()]
#     # column_names = [description[0].ljust(50, "*") for description in cursor.description]

#     # # rows = [[v.decode('utf-8', errors='ignore').strip() if isinstance(v, bytes) else v for v in row] for row in rows]
#     # # rows = [[type(v) for v in row] for row in rows]

#     # print(tabulate(rows, headers=column_names, tablefmt="double_outline"))
    
#     # print(f"\nTotal number of rows: {len(rows)}")

#     from IPython.display import HTML, display
#     df = pd.read_sql_query("SELECT * FROM denormalized_test_sensor_statistics;", conn)
#     display(df)

In [None]:
parts = rows[0][0].split(b'\x00\x00\x00')
print(parts)
# Decode each part and filter out non-printable strings
decoded_parts = [part.decode('utf-8', errors='ignore').strip() for part in parts]
print(decoded_parts)
readable_parts = [part for part in decoded_parts if part]
print(readable_parts)

In [4]:
import os
sys.path.append(os.path.abspath('./src'))
from feast_test.stream_job import ds

In [5]:
schema = ds.schema()

In [6]:
for s in schema:
    print(s.name)

sensor_name
count
min
max
average
window_start_time
window_end_time


In [13]:
t = ds.get_feast_schema()
print(t)

[sensor_name-String, count-Int64, min-Float64, max-Float64, average-Float64, window_start_time-UnixTimestamp, window_end_time-UnixTimestamp]


In [45]:
from feast.type_map import pa_to_feast_value_type
from feast.value_type import ValueType

def ds_to_feast_types(ds: DataStream) -> List[ValueType]:
    return [
        (s.name, pa_to_feast_value_type(str(s.type))) for s in ds.schema()
    ]

In [49]:
for (name, feast_type) in ds_to_feast_types(ds):
    print(name, feast_type)

sensor_name ValueType.STRING
count ValueType.INT64
min ValueType.DOUBLE
max ValueType.DOUBLE
average ValueType.DOUBLE
window_start_time ValueType.UNIX_TIMESTAMP
window_end_time ValueType.UNIX_TIMESTAMP


In [54]:
import importlib

def is_feast_available():
    return importlib.util.find_spec("feast") is not None


In [55]:
is_feast_available()

True