In [63]:
# Importing necessary libraries
import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import io
from time import gmtime, strftime, sleep
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_store import FeatureStore
import time
from datetime import datetime, timezone

In [3]:
# Ensuring boto3 version is above 1.17.21
original_boto3_version = boto3.__version__
%pip install 'boto3>1.17.21'

Note: you may need to restart the kernel to use updated packages.


In [4]:
region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [5]:
# Setting default s3 bucket
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-assignment3.1"

print(default_s3_bucket_name)

sagemaker-us-east-1-851725636446


In [6]:
# Setting up IAM role
role = get_execution_role()
print(role)

arn:aws:iam::851725636446:role/LabRole


In [13]:
# Inspecting datasets
housing_data = pd.read_csv("Assignment3.1/housing.csv")
housing_gmaps_data = pd.read_csv("Assignment3.1/housing_gmaps_data_raw.csv")

In [14]:
housing_data.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [15]:
housing_gmaps_data.head()

Unnamed: 0,street_number,route,locality-political,administrative_area_level_2-political,administrative_area_level_1-political,country-political,postal_code,address,longitude,latitude,neighborhood-political,postal_code_suffix,establishment-point_of_interest-transit_station,establishment-park-point_of_interest,premise,establishment-point_of_interest-subway_station-transit_station,airport-establishment-finance-moving_company-point_of_interest-storage,subpremise,bus_station-establishment-point_of_interest-transit_station,establishment-park-point_of_interest-tourist_attraction,establishment-natural_feature,airport-establishment-point_of_interest,political-sublocality-sublocality_level_1,administrative_area_level_3-political,post_box,establishment-light_rail_station-point_of_interest-transit_station,establishment-point_of_interest,aquarium-establishment-park-point_of_interest-tourist_attraction-zoo,campground-establishment-lodging-park-point_of_interest-rv_park-tourist_attraction,cemetery-establishment-park-point_of_interest
0,3130,Grizzly Peak Boulevard,Berkeley,Alameda County,California,United States,94705.0,"3130 Grizzly Peak Blvd, Berkeley, CA 94705, USA",-122.23,37.88,,,,,,,,,,,,,,,,,,,,
1,2005,Tunnel Road,Oakland,Alameda County,California,United States,94611.0,"2005 Tunnel Rd, Oakland, CA 94611, USA",-122.22,37.86,Merriewood,1021.0,,,,,,,,,,,,,,,,,,
2,6886,Chabot Road,Oakland,Alameda County,California,United States,94618.0,"6886 Chabot Rd, Oakland, CA 94618, USA",-122.24,37.85,Upper Rockridge,,,,,,,,,,,,,,,,,,,
3,6365,Florio Street,Oakland,Alameda County,California,United States,94618.0,"6365 Florio St, Oakland, CA 94618, USA",-122.25,37.85,Rockridge,1335.0,,,,,,,,,,,,,,,,,,
4,5407,Bryant Avenue,Oakland,Alameda County,California,United States,94618.0,"5407 Bryant Ave, Oakland, CA 94618, USA",-122.25,37.84,Rockridge,1431.0,,,,,,,,,,,,,,,,,,


In [19]:
# Checking for NaNs in the gmaps dataset
nan_count = housing_gmaps_data["neighborhood-political"].isna().sum()
total_count = len(housing_gmaps_data)
print(f"NaN neighborhood-political values: {nan_count}/{total_count}")

NaN neighborhood-political values: 8413/12590


In [20]:
# Imputing neighborhood-political column since there are so many missing rows
housing_gmaps_data["neighborhood-political"] = housing_gmaps_data["neighborhood-political"].fillna("Unknown")

In [21]:
# Rounding longitude and latitude to 4 decimals to match and merge datasets
housing_data["longitude_rounded"] = housing_data["longitude"].round(4)
housing_data["latitude_rounded"] = housing_data["latitude"].round(4)

housing_gmaps_data["longitude_rounded"] = housing_gmaps_data["longitude"].round(4)
housing_gmaps_data["latitude_rounded"] = housing_gmaps_data["latitude"].round(4)

In [22]:
# Merging datasets
merged = housing_data.merge(
    housing_gmaps_data,
    on=["longitude_rounded", "latitude_rounded"],
    how="left",
    suffixes=("", "_gmaps")
)

# Ensuring that neighborhood-political is present in merged
print(merged[["longitude", "latitude", "neighborhood-political"]].head())

   longitude  latitude neighborhood-political
0    -122.23     37.88                Unknown
1    -122.22     37.86             Merriewood
2    -122.24     37.85        Upper Rockridge
3    -122.25     37.85              Rockridge
4    -122.25     37.85              Rockridge


In [28]:
# Creating primary key and event time
merged["primary_key"] = merged["neighborhood-political"]
merged["event_time"] = datetime.now(timezone.utc).isoformat()

In [46]:
# One-Hot Encoding ocean_proximity and creating columns
ocean_dummies = pd.get_dummies(merged["ocean_proximity"])
merged = pd.concat([merged, ocean_dummies], axis=1)

merged.rename(columns={
    "<1H OCEAN": "<1h ocean",
    "INLAND": "inland",
    "ISLAND": "island",
    "NEAR BAY": "near bay",
    "NEAR OCEAN": "near ocean"
}, inplace=True)

In [47]:
# Creating median house value
merged["median house value"] = (merged.groupby("primary_key")["median_house_value"].transform("mean").clip(upper=500000))

In [48]:
# Creating median house age
merged["median house age"] = (merged.groupby("primary_key")["housing_median_age"]
    .transform("mean")
    .apply(lambda x: f"{int(x // 10) * 10}-{int(x // 10) * 10 + 9}")
)

In [49]:
# Creating total households
merged["total households"] = np.ceil(
    merged.groupby("primary_key")["households"].transform("mean")).astype(int)

In [50]:
# Creating bedrooms per household

# Computing per postal-code average
postal_avg = (
    merged.groupby("postal_code")["total_bedrooms"]
    .transform("mean")
)

# Imputing missing total_bedrooms
merged["total_bedrooms"] = merged["total_bedrooms"].fillna(postal_avg)

# Computing bedrooms per households
merged["bedrooms per household"] = merged["total_bedrooms"] / merged["households"]

In [54]:
# Selecting final feature group columns
merged = merged.loc[:, ~merged.columns.duplicated()]

neighborhood_features = merged[[
    "primary_key", "event_time",
    "<1h ocean", "inland", "island", "near bay", "near ocean",
    "median house value", "median house age", "total households", "bedrooms per household"
]].drop_duplicates(subset=["primary_key"])

In [59]:
# Renaming columns to valid feature names
rename_map = {
    "<1h ocean": "lt1h_ocean",
    "inland": "inland",
    "island": "island",
    "near bay": "near_bay",
    "near ocean": "near_ocean",
    "median house value": "median_house_value",
    "median house age": "median_house_age",
    "total households": "total_households",
    "bedrooms per household": "bedrooms_per_household"
}

neighborhood_features.rename(columns=rename_map, inplace=True)

In [61]:
# Creating feature group in SageMaker
session = sagemaker.Session()
feature_group = FeatureGroup(name="neighborhood-feature-group", sagemaker_session=session)

# Converting bool columns to int (0/1)
bool_cols = ["lt1h_ocean", "inland", "island", "near_bay", "near_ocean"]
neighborhood_features[bool_cols] = neighborhood_features[bool_cols].astype(int)

feature_group.load_feature_definitions(data_frame=neighborhood_features)


def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name="primary_key",
    event_time_feature_name="event_time",
    role_arn=role,
    enable_online_store=True
)

# Waiting for feature group creation to complete
wait_for_feature_group_creation_complete(feature_group=feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup neighborhood-feature-group successfully created.


In [62]:
# Ingesting data
feature_group.ingest(data_frame=neighborhood_features, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='neighborhood-feature-group', feature_definitions={'primary_key': {'FeatureName': 'primary_key', 'FeatureType': 'String'}, 'event_time': {'FeatureName': 'event_time', 'FeatureType': 'String'}, 'lt1h_ocean': {'FeatureName': 'lt1h_ocean', 'FeatureType': 'Integral'}, 'inland': {'FeatureName': 'inland', 'FeatureType': 'Integral'}, 'island': {'FeatureName': 'island', 'FeatureType': 'Integral'}, 'near_bay': {'FeatureName': 'near_bay', 'FeatureType': 'Integral'}, 'near_ocean': {'FeatureName': 'near_ocean', 'FeatureType': 'Integral'}, 'median_house_value': {'FeatureName': 'median_house_value', 'FeatureType': 'Fractional'}, 'median_house_age': {'FeatureName': 'median_house_age', 'FeatureType': 'String'}, 'total_households': {'FeatureName': 'total_households', 'FeatureType': 'Integral'}, 'bedrooms_per_household': {'FeatureName': 'bedrooms_per_household', 'FeatureType': 'Fractional'}}, sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0

In [85]:
# Creating queries
neighborhood_fg = FeatureGroup(name="neighborhood-feature-group", sagemaker_session=session)

# Creating an AthenaQuery object for this feature group
neighborhood_query = neighborhood_fg.athena_query()
neighborhood_table = neighborhood_query.table_name
print("Athena table:", neighborhood_table)

# Building query string for the neighborhoods
query_string = f"""
SELECT *
FROM "{neighborhood_query.database}"."{neighborhood_query.table_name}"
WHERE primary_key IN ('Brooktree', 'Fisherman''s Wharf', 'Los Osos')
"""

print("Running query:", query_string)

# Running the Athena query and writing results to S3
neighborhood_query.run(
    query_string=query_string,
    output_location=f"s3://{default_s3_bucket_name}/{prefix}/query_results/"
)

# Wait for query to complete
neighborhood_query.wait()

# Loading results into a Pandas DataFrame
neighborhood_df = neighborhood_query.as_dataframe()
print(neighborhood_df)

Athena table: neighborhood_feature_group_1758603775
Running query: 
SELECT *
FROM "sagemaker_featurestore"."neighborhood_feature_group_1758603775"
WHERE primary_key IN ('Brooktree', 'Fisherman''s Wharf', 'Los Osos')

         primary_key  ... is_deleted
0  Fisherman's Wharf  ...      False
1          Brooktree  ...      False
2           Los Osos  ...      False

[3 rows x 14 columns]
