In [None]:
import os
import feast

actual_version = feast.__version__
assert actual_version == os.environ.get("FEAST_VERSION"), (
    f"‚ùå Feast version mismatch. Expected: {os.environ.get('FEAST_VERSION')}, Found: {actual_version}"
)
print(f"‚úÖ Found Expected Feast version: {actual_version} in workbench")

In [None]:
# --- Configuration Variables ---
import os 

# Fetch token and server directly from oc CLI
import subprocess

def oc(cmd_list):
    """Safely execute oc commands without shell injection risk."""
    return subprocess.check_output(cmd_list, shell=False).decode("utf-8").strip()

token = oc(["oc", "whoami", "-t"])
server = oc(["oc", "whoami", "--show-server"])
namespace = os.environ.get("NAMESPACE")


In [None]:
# Safely execute oc login without shell injection risk
import subprocess
subprocess.check_output(
    ["oc", "login", "--token", token, "--server", server],
    shell=False
)

In [None]:
# Add user permission to namespace
import subprocess

# Get current user safely
current_user = subprocess.check_output(
    ["oc", "whoami"],
    shell=False
).decode("utf-8").strip()

# Add role to user safely
subprocess.check_output(
    ["oc", "adm", "policy", "add-role-to-user", "admin", current_user, "-n", namespace],
    shell=False
)

In [None]:
import os
import subprocess

namespace = os.environ.get("NAMESPACE")  # read namespace from env
if not namespace:
    raise ValueError("NAMESPACE environment variable is not set")

# Safely execute oc command and pipe through sed without shell injection risk
oc_process = subprocess.Popen(
    ["oc", "get", "configmap", "feast-credit-scoring-client", "-n", namespace,
     "-o", "jsonpath={.data.feature_store\\.yaml}"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    shell=False
)
sed_process = subprocess.Popen(
    ["sed", "s/\\\\n/\\n/g"],
    stdin=oc_process.stdout,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    shell=False
)
oc_process.stdout.close()
yaml_content, _ = sed_process.communicate()
yaml_content = yaml_content.decode("utf-8")

# Save the configmap data into an environment variable (if needed)
os.environ["CONFIGMAP_DATA"] = yaml_content

In [None]:
from feast import FeatureStore
fs_credit_scoring_local = FeatureStore(fs_yaml_file='/opt/app-root/src/feast-config/credit_scoring_local')

In [None]:
fs_credit_scoring_local.refresh_registry()

In [None]:
project_name = "credit_scoring_local"
project = fs_credit_scoring_local.get_project(project_name)

# 1. Assert object returned
assert project is not None, f"‚ùå get_project('{project_name}') returned None"

# 2. Extract project name (works for dict or Feast object)
if isinstance(project, dict):
    returned_name = project.get("spec", {}).get("name")
else:
    # Feast Project object
    returned_name = getattr(project, "name", None)
    if not returned_name and hasattr(project, "spec") and hasattr(project.spec, "name"):
        returned_name = project.spec.name

# 3. Assert that name exists
assert returned_name, f"‚ùå Returned project does not contain a valid name: {project}"

print("‚Ä¢ Project Name Returned:", returned_name)

# 4. Assert the name matches expected
assert returned_name == project_name, (
    f"‚ùå Expected project '{project_name}', but got '{returned_name}'"
)

print(f"\n‚úì get_project('{project_name}') validation passed!")


In [None]:
feast_list_functions = [
    "list_projects",
    "list_entities",
    "list_feature_views",
    "list_all_feature_views",
    "list_batch_feature_views",
    "list_on_demand_feature_views",
]

# validates feast list methods returns data and method type
def validate_list_method(fs_obj, method_name):
    assert hasattr(fs_obj, method_name), f"Method not found: {method_name}"

    method = getattr(fs_obj, method_name)
    result = method()

    assert isinstance(result, list), (
        f"{method_name}() must return a list, got {type(result)}"
    )
    assert len(result) > 0, (
        f"{method_name}() returned an empty list ‚Äî expected data"
    )

    print(f"‚úì {method_name}() returned {len(result)} items")

for m in feast_list_functions:
    validate_list_method(fs_credit_scoring_local, m)


In [None]:
feast_list_functions = [
    "list_feature_services",
    # "list_permissions",
    "list_saved_datasets",
]

# validates feast methods exists and type is valid
def validate_list_func(fs_obj, method_name):
    assert hasattr(fs_obj, method_name), f"Method not found: {method_name}"

    method = getattr(fs_obj, method_name)

    result = method()

    assert isinstance(result, list), (
        f"{method_name}() must return a list, got {type(result)}"
    )

for m in feast_list_functions:
    validate_list_func(fs_credit_scoring_local, m)

In [None]:
# validate_list_data_sources for with and without permissions 

import os
from feast.errors import FeastPermissionError

def validate_list_data_sources(fs_obj):
    """
    Validates list_data_sources() with special handling for Kubernetes auth mode.
    If CONFIGMAP_DATA indicates auth=kubernetes, expect FeastPermissionError.
    Otherwise validate output type normally.
    """
    auth_mode = os.getenv("CONFIGMAP_DATA")

    # Case 1: Kubernetes auth ‚Üí expect permission error
    if "kubernetes" in auth_mode.lower():
        try:
            fs_obj.list_data_sources()
            raise AssertionError(
                "Expected FeastPermissionError due to Kubernetes auth, but the call succeeded."
            )
        except FeastPermissionError as e:
            # Correct, this is expected
            return
        except Exception as e:
            raise AssertionError(
                f"Expected FeastPermissionError, but got different exception: {type(e)} - {e}"
            )

    # Case 2: Non-Kubernetes auth ‚Üí normal path
    assert hasattr(fs_obj, "list_data_sources"), "Method not found: list_data_sources"
    result = fs_obj.list_data_sources()
    assert isinstance(result, list), (
        f"list_data_sources() must return a list, got {type(result)}"
    )


In [None]:

entity = fs_credit_scoring_local.get_entity("dob_ssn")

assert entity is not None, "‚ùå Entity 'dob_ssn' not found!"
assert entity.name == "dob_ssn", f"‚ùå Entity name mismatch: {entity.name}"

print("‚úì Entity validation successful!\n", entity.name)

In [None]:
import os
from feast.errors import FeastPermissionError

def validate_get_data_source(fs_obj, name: str):
    auth_mode = os.getenv("CONFIGMAP_DATA", "")

    print("üìå CONFIGMAP_DATA:", auth_mode)

    # If Kubernetes auth is enabled ‚Üí expect permission error
    if "auth" in "kubernetes" in auth_mode.lower():
        print(f"üîí Kubernetes auth detected, expecting permission error for get_data_source('{name}')")

        try:
            fs_obj.get_data_source(name)
            raise AssertionError(
                f"‚ùå Expected FeastPermissionError when accessing data source '{name}', but call succeeded"
            )

        except FeastPermissionError as e:
            print(f"‚úÖ Correctly blocked with FeastPermissionError: {e}")
            return

        except Exception as e:
            raise AssertionError(
                f"‚ùå Expected FeastPermissionError but got {type(e)}: {e}"
            )

    # Otherwise ‚Üí normal validation
    print(f"üîç Fetching data source '{name}'...")

    ds = fs_obj.get_data_source(name)

    print("\nüìå Data Source Object:")
    print(ds)

    assert ds.name == name, (
        f"‚ùå Expected name '{name}', got '{ds.name}'"
    )

    print(f"‚úÖ Data source '{name}' exists and is correctly configured.")


In [None]:
import time

feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
]

entity_rows = [{
    "zipcode": 1463,
    "dob_ssn": "19530219_5179"
}]

response = None
last_error = None

for attempt in range(1, 4):
    try:
        response = fs_credit_scoring_local.get_online_features(
            features=feast_features,
            entity_rows=entity_rows,
        ).to_dict()
        break
    except Exception as exc:
        last_error = exc
        if attempt < 3:
            print(f"‚ö†Ô∏è Online read failed (attempt {attempt}/3): {exc}")
            time.sleep(5)
        else:
            raise

assert response is not None, f"Online feature read failed: {last_error}"

print("Actual response:", response)

expected = {
    'zipcode': [1463],
    'dob_ssn': ['19530219_5179'],
    'city': ['PEPPERELL'],
    'state': ['MA'],
}

assert response == expected

In [None]:
# Offline store write
import os
import pandas as pd
from feast.errors import FeastPermissionError

write_feature_view = "zipcode_features"
write_location_type = "PRIMARY_WRITE_TEST"
write_total_wages = 310246739

offline_base_write_data = {
    "zipcode": [1463],
    "city": ["PEPPERELL"],
    "state": ["MA"],
    "location_type": [write_location_type],
    "tax_returns_filed": [5549],
    "population": [10100],
    "total_wages": [write_total_wages],
    "event_timestamp": [pd.Timestamp("2017-01-01 12:00:00+00:00")],
    "created_timestamp": [pd.Timestamp("2017-01-01 12:00:00+00:00")],
}

# Offline store requires created_timestamp in this repo
offline_write_df = pd.DataFrame(offline_base_write_data)

auth_mode = os.getenv("CONFIGMAP_DATA", "")
if "kubernetes" in auth_mode.lower():
    try:
        fs_credit_scoring_local.write_to_offline_store(
            feature_view_name=write_feature_view,
            df=offline_write_df,
            allow_registry_cache=False,
            reorder_columns=True,
        )
        raise AssertionError("‚ùå Expected FeastPermissionError for offline write, but call succeeded")
    except FeastPermissionError as exc:
        print(f"‚úÖ Correctly blocked offline write with FeastPermissionError: {exc}")
else:
    fs_credit_scoring_local.write_to_offline_store(
        feature_view_name=write_feature_view,
        df=offline_write_df,
        allow_registry_cache=False,
        reorder_columns=True,
    )
    print(f"‚úÖ write_to_offline_store() executed for '{write_feature_view}'")


In [None]:

import os
import pandas as pd
from datetime import timedelta
from feast import FeatureView
from feast.errors import FeastPermissionError
from feast.types import Int64
from feast.field import Field
from feast.entity import Entity
from feast.value_type import ValueType
from feast.infra.offline_stores.file_source import FileSource
from feast.data_format import ParquetFormat

# ---- Define Entity (REQUIRED) ----
dob_ssn = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING,
    description="Date of birth + last four digits of SSN",
)

# ---- FileSource WITHOUT created_timestamp ----
credit_history_source = FileSource(
    name="Credit history",
    path="data/credit_history.parquet",
    file_format=ParquetFormat(),
    timestamp_field="event_timestamp",
)

# ---- FeatureView (use Entity object) ----
credit_history = FeatureView(
    name="credit_history",
    entities=[dob_ssn],   # ‚úÖ correct
    ttl=timedelta(days=90),
    schema=[
        Field(name="credit_card_due", dtype=Int64),
        Field(name="mortgage_due", dtype=Int64),
        Field(name="student_loan_due", dtype=Int64),
        Field(name="vehicle_loan_due", dtype=Int64),
        Field(name="hard_pulls", dtype=Int64),
        Field(name="missed_payments_2y", dtype=Int64),
        Field(name="missed_payments_1y", dtype=Int64),
        Field(name="missed_payments_6m", dtype=Int64),
        Field(name="bankruptcies", dtype=Int64),
    ],
    source=credit_history_source,
)

auth_mode = os.getenv("CONFIGMAP_DATA", "")
if "kubernetes" in auth_mode.lower():
    try:
        fs_credit_scoring_local.apply([dob_ssn, credit_history])
        raise AssertionError("‚ùå Expected FeastPermissionError for apply, but call succeeded")
    except FeastPermissionError as exc:
        print(f"‚úÖ Correctly blocked apply with FeastPermissionError: {exc}")
else:
    # ---- Apply FeatureView ----
    fs_credit_scoring_local.apply([dob_ssn, credit_history])

    # ---- Online write (NO created_timestamp) ----
    ts = pd.Timestamp("2020-04-26 18:01:04.746575")

    online_write_df = pd.DataFrame({
        "dob_ssn": ["19530219_5179"],
        "credit_card_due": [8419],
        "mortgage_due": [91803],
        "student_loan_due": [22328],
        "vehicle_loan_due": [15078],
        "hard_pulls": [0],
        "missed_payments_2y": [1],
        "missed_payments_1y": [0],
        "missed_payments_6m": [0],
        "bankruptcies": [0],
        "event_timestamp": [ts],
    })

    fs_credit_scoring_local.write_to_online_store(
        feature_view_name="credit_history",
        df=online_write_df,
        allow_registry_cache=False,
    )

    print("‚úÖ Online store write succeeded")

    # ---- Verify ----
    online_df = fs_credit_scoring_local.get_online_features(
        features=[
            "credit_history:credit_card_due",
            "credit_history:mortgage_due",
        ],
        entity_rows=[{"dob_ssn": "19530219_5179"}],
    ).to_df()

    print(online_df)


In [None]:
import os
from feast.errors import FeastPermissionError

feature_view_name = "credit_history"

auth_mode = os.getenv("CONFIGMAP_DATA", "")
if "kubernetes" in auth_mode.lower():
    try:
        fs_credit_scoring_local.delete_feature_view(feature_view_name)
        raise AssertionError("‚ùå Expected FeastPermissionError for delete_feature_view, but call succeeded")
    except FeastPermissionError as exc:
        print(f"‚úÖ Correctly blocked delete_feature_view with FeastPermissionError: {exc}")
else:
    fs_credit_scoring_local.delete_feature_view(feature_view_name)

    # Verify deletion via list and get calls
    remaining_fvs = [fv.name for fv in fs_credit_scoring_local.list_feature_views()]
    assert feature_view_name not in remaining_fvs, (
        f"‚ùå FeatureView '{feature_view_name}' still present after deletion"
    )

    try:
        fs_credit_scoring_local.get_feature_view(feature_view_name)
        raise AssertionError(
            f"‚ùå Expected get_feature_view('{feature_view_name}') to fail after deletion"
        )
    except Exception as exc:
        print(f"‚úÖ Deletion verified for FeatureView '{feature_view_name}': {type(exc).__name__}")


In [None]:
odfv_name = "total_debt_calc"
odfv = fs_credit_scoring_local.get_on_demand_feature_view(odfv_name)

assert odfv is not None, f"‚ùå get_on_demand_feature_view('{odfv_name}') returned None"

if isinstance(odfv, dict):
    odfv_name_returned = odfv.get("spec", {}).get("name")
    odfv_feature_names = [
        f.get("spec", {}).get("name")
        for f in odfv.get("spec", {}).get("features", [])
    ]
    odfv_write_to_online = odfv.get("spec", {}).get("write_to_online_store")
else:
    odfv_name_returned = getattr(odfv, "name", None)
    if not odfv_name_returned and hasattr(odfv, "spec") and hasattr(odfv.spec, "name"):
        odfv_name_returned = odfv.spec.name
    odfv_feature_names = [f.name for f in getattr(odfv, "features", [])]
    odfv_write_to_online = getattr(odfv, "write_to_online_store", None)

assert odfv_name_returned == odfv_name, (
    f"‚ùå Expected ODFV '{odfv_name}', but got '{odfv_name_returned}'"
)
assert "total_debt_due" in odfv_feature_names, (
    f"‚ùå ODFV '{odfv_name}' missing 'total_debt_due' feature"
)
assert odfv_write_to_online is False, (
    f"‚ùå ODFV '{odfv_name}' write_to_online_store expected False"
)

print(f"‚úÖ get_on_demand_feature_view('{odfv_name}')")