In [13]:
import numpy as np
import random
import pandas as pd
from pyspark.sql import Row
from hops import featurestore

In [14]:
area_ids = list(range(1,10))

In [15]:
house_sizes = []
house_worths = []
house_ages = []
house_area_ids = []
for i in area_ids:
    for j in list(range(1,10)):
        house_sizes.append(abs(np.random.normal()*1000)/i)
        house_worths.append(abs(np.random.normal()*10000)/i)
        house_ages.append(abs(np.random.normal()*10000)/i)
        house_area_ids.append(i)
house_ids = list(range(len(house_area_ids)))
houses_for_sale_data  = pd.DataFrame({
        'area_id':house_area_ids,
        'house_id':house_ids,
        'house_worth': house_worths,
        'house_age': house_ages,
        'house_size': house_sizes
    })
houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)

In [16]:
sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").sum()
count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").count()
sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, "area_id")
sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \
    .withColumnRenamed("sum(house_age)", "sum_house_age") \
    .withColumnRenamed("sum(house_worth)", "sum_house_worth") \
    .withColumnRenamed("sum(house_size)", "sum_house_size") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_house_for_sale(row):
    avg_house_worth = row.sum_house_worth/float(row.num_rows)
    avg_house_size = row.sum_house_size/float(row.num_rows)
    avg_house_age = row.sum_house_age/float(row.num_rows)
    return Row(
        sum_house_worth=row.sum_house_worth, 
        sum_house_age=row.sum_house_age,
        sum_house_size=row.sum_house_size,
        area_id = row.area_id,
        avg_house_worth = avg_house_worth,
        avg_house_size = avg_house_size,
        avg_house_age = avg_house_age
       )
houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(
    lambda row: compute_average_features_house_for_sale(row)
).toDF()

In [17]:
house_purchased_amounts = []
house_purchases_bidders = []
house_purchases_area_ids = []
for i in area_ids:
    for j in list(range(1,10)):
        house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)
        house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))
        house_purchases_area_ids.append(i)
house_purchase_ids = list(range(len(house_purchases_bidders)))
houses_sold_data  = pd.DataFrame({
        'area_id':house_purchases_area_ids,
        'house_purchase_id':house_purchase_ids,
        'number_of_bidders': house_purchases_bidders,
        'sold_for_amount': house_purchased_amounts
    })
houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)

In [18]:
sum_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").sum()
count_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").count()
sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, "area_id")
sum_count_houses_sold_df = sum_count_houses_sold_df \
    .withColumnRenamed("sum(number_of_bidders)", "sum_number_of_bidders") \
    .withColumnRenamed("sum(sold_for_amount)", "sum_sold_for_amount") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_houses_sold(row):
    avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)
    avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)
    return Row(
        sum_number_of_bidders=row.sum_number_of_bidders, 
        sum_sold_for_amount=row.sum_sold_for_amount,
        area_id = row.area_id,
        avg_num_bidders = avg_num_bidders,
        avg_sold_for = avg_sold_for
       )
houses_sold_features_df = sum_count_houses_sold_df.rdd.map(
    lambda row: compute_average_features_houses_sold(row)
).toDF()

In [19]:
featurestore.create_featuregroup(
    houses_for_sale_features_df,
    "test_houses_for_sale_featuregroup",
    description="aggregate features of houses for sale per area",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use demo_deep_learning_admin000_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [20]:
featurestore.create_featuregroup(
    houses_sold_features_df,
    "test_houses_sold_featuregroup",
    description="aggregate features of sold houses per area",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use demo_deep_learning_admin000_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully

In [21]:
features_df = featurestore.get_features(["avg_house_age", "avg_house_size", 
                                         "avg_house_worth", "avg_num_bidders", 
                                         "avg_sold_for"], 
                                        featurestore=featurestore.project_featurestore(),
                                        featuregroups_version_dict={"test_houses_for_sale_featuregroup": 1,
                                        "test_houses_sold_featuregroup": 1})

Running sql: use demo_deep_learning_admin000_featurestore against offline feature store
Logical query plan for getting 5 features from the featurestore created successfully
SQL string for the query created successfully
Running sql: SELECT avg_sold_for, avg_house_age, avg_house_size, avg_num_bidders, avg_house_worth FROM test_houses_for_sale_featuregroup_1 JOIN test_houses_sold_featuregroup_1 ON test_houses_for_sale_featuregroup_1.`area_id`=test_houses_sold_featuregroup_1.`area_id` against offline feature store

In [22]:
featurestore.create_training_dataset(
    features_df, "predict_house_sold_for_dataset",
    data_format="tfrecords",
    descriptive_statistics=False,
    feature_correlation=False,
    feature_histograms=False,
    cluster_analysis=False
)

write feature frame, write_mode: overwrite
Training Dataset created successfully