In [1]:
import hopsworks
from hsfs.feature import Feature
import snowflake.connector

proj = hopsworks.login(project="kimberly")
fs = proj.get_feature_store()

  from .autonotebook import tqdm as notebook_tqdm


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/644244
Connected. Call `.close()` to terminate connection gracefully.


In [2]:
sc = fs.get_storage_connector("snowflake_github")

In [None]:

query_str = """
WITH latest_repo_name AS (
    SELECT repo_name,
           repo_id
    FROM cybersyn.github_repos
    QUALIFY ROW_NUMBER() OVER (PARTITION BY repo_id ORDER BY first_seen DESC) = 1
)
SELECT LOWER(repo.repo_name) as repo_name,
       SUM(stars.count) AS sum_stars
FROM cybersyn.github_stars AS stars
JOIN latest_repo_name AS repo
    ON (repo.repo_id = stars.repo_id)
WHERE stars.date >= DATEADD('day', -365, DATE('2023-11-13'))
GROUP BY repo.repo_name, repo.repo_id
ORDER BY sum_stars DESC NULLS LAST
LIMIT 100;"""

features = [
    Feature(name="repo_name",type="string"),
    Feature(name="sum_stars",type="int")
]

github_most_starts_fg = fs.create_external_feature_group(
    name="github_most_stars",
    version=1,
    description="The Github repos that got the most stars last year",
    primary_key=['repo_name'],
    query=query_str,
    storage_connector=sc,
    features=features
)

github_most_starts_fg.save()

In [None]:
sc = snowflake_github

In [None]:
# Snowflake connection parameters
conn = snowflake.connector.connect(
    user=sc.user,
    password=sc.password,
    account=sc.account,
    warehouse=sc.warehouse,
    database="CHICAGO_DIVVY_BIKE_STATION_STATUS",
    schema="PUBLIC"
)

# SQL query to fetch the data
query = "SELECT * FROM STATION_INFO_FLATTEN"

# Execute the query
cur = conn.cursor()
cur.execute(query)
rows = cur.fetchall()

# Convert to DataFrame
import pandas as pd
df = pd.DataFrame(rows, columns=[x[0] for x in cur.description])

# Close the cursor and connection
cur.close()
conn.close()
df

In [None]:
fg_cbs = fs.get_or_create_feature_group(name="chicago_bike_stations",
                                    version=1,
                                    description="Chicago bike station details",
                                    primary_key=["station_id"]
                                   )
fg_cbs.insert(df)

In [4]:
conn = snowflake.connector.connect(
    user=sc.user,
    password=sc.password,
    account=sc.account,
    warehouse=sc.warehouse,
    database="CHICAGO_DIVVY_BIKE_STATION_STATUS",
    schema="PUBLIC"
)

query = """
    SELECT STATION_ID as id
        , STATION_STATUS as station_status
        , NUM_BIKES_AVAILABLE as num_bikes_available
        , NUM_EBIKES_AVAILABLE as num_ebikes_available
        , LAST_UPDATED as last_updated
    FROM STATION_STATUS_FLATTEN_FULL ORDER BY last_updated LIMIT 50000 
"""
# Execute the query
cur = conn.cursor()
cur.execute(query)
rows = cur.fetchall()

# Convert to DataFrame
import pandas as pd
df2 = pd.DataFrame(rows, columns=[x[0] for x in cur.description])

# Close the cursor and connection
cur.close()
conn.close()
df2

Unnamed: 0,ID,STATION_STATUS,NUM_BIKES_AVAILABLE,NUM_EBIKES_AVAILABLE,LAST_UPDATED
0,"""331""","""active""",4,0,2021-10-20 19:52:20
1,"""693""","""active""",5,1,2021-10-20 19:52:20
2,"""51""","""active""",38,5,2021-10-20 19:52:20
3,"""674""","""active""",2,0,2021-10-20 19:52:20
4,"""16""","""active""",7,1,2021-10-20 19:52:20
...,...,...,...,...,...
49995,"""1448642162257565290""","""active""",0,0,2021-10-20 20:55:35
49996,"""641""","""active""",2,0,2021-10-20 20:55:35
49997,"""556""","""active""",1,0,2021-10-20 20:55:35
49998,"""154""","""active""",3,0,2021-10-20 20:55:35


In [6]:
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

# Create an Expectation Suite
expectation_suite = ExpectationSuite(
    expectation_suite_name="transaction_suite")

expectation_suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"num_bikes_available",
            "min_value": 0
        }
    )
)

expectation_suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column":"id"}
    )
)

{"expectation_type": "expect_column_values_to_not_be_null", "meta": {}, "kwargs": {"column": "id"}}

In [7]:
fg_cbs = fs.get_or_create_feature_group(name="chicago_bike_station_status",
                                    version=1,
                                    description="Chicago bike station details",
                                    primary_key=["id"],
                                    event_time="last_updated",
                                    online_enabled=True,
                                    expectation_suite=expectation_suite
                                   )
fg_cbs.insert(df2)



Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/644244/fs/640067/fg/727439
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/644244/fs/640067/fg/727439


Uploading Dataframe: 100.00% |████████████████████| Rows 50000/50000 | Elapsed Time: 00:09 | Remaining Time: 00:00


Launching job: chicago_bike_station_status_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/644244/jobs/named/chicago_bike_station_status_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7f73703c1570>,
 {
   "success": false,
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 2,
     "successful_expectations": 1,
     "unsuccessful_expectations": 1,
     "success_percent": 50.0
   },
   "meta": {
     "great_expectations_version": "0.15.12",
     "expectation_suite_name": "transaction_suite",
     "run_id": {
       "run_name": null,
       "run_time": "2024-04-17T19:50:25.740814+00:00"
     },
     "batch_kwargs": {
       "ge_batch_id": "bc0eb95e-fcf3-11ee-b0a8-00155d1167e0"
     },
     "batch_markers": {},
     "batch_parameters": {},
     "validation_time": "20240417T195025.740704Z",
     "expectation_suite_meta": {
       "great_expectations_version": "0.15.12"
     }
   },
   "results": [
     {
       "exception_info": {
         "raised_exception": true,
         "exception_message": "TypeError: '>=' not supported between instances of 'str' and 'int'",
         "exception_traceback": "Traceback (most rece