## 0.Spark Magic basic configuration

In [1]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///user/hadoop/spark-tfrecord_2.12-0.3.0.jar, hdfs:///user/hadoop/httpclient-4.5.9.jar, hdfs:///user/hadoop/hudi-spark-bundle.jar,hdfs:///user/hadoop/spark-avro.jar",
             "spark.submit.pyFiles":"s3://emrfssampledata/conf/emr_fs.zip",
             "spark.sql.hive.convertMetastoreParquet":"false",     
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer"
           } 
}

## 1.test feature group creation

In [3]:
import sys
from emr_fs.feature_store import FeatureStore
from emr_fs.client import Client
client=Client('kernal')
emr_fs01 = client.connect_to_feature_store("emr_feature_store")
#emr_fg01 = emr_fs01.get_feature_group("customer_base")
#test create feature group
features01={"customer_id":"int","city_code":"int","state_code":"string","country_code":"string","dt":"string"}
emr_fg01 = client.create_feature_group("customer_base","","customer_id","dt",features01)
emr_fg01.print_info()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

created feature group:customer_base
*************feature group details*************
feature store name:emr_feature_store
feature group name:customer_base
feature_unique_key:customer_id
feature_partition_key:dt
all features:
   customer_id:int
   city_code:int
   state_code:string
   country_code:string
   dt:string
***********************************************

## 2.test connect to exsiting feature group

In [4]:
   emr_fg02= emr_fs01.get_feature_group("customer_advance")
   emr_fg02.print_info()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

*************feature group details*************
feature store name:emr_feature_store
feature group name:customer_advance
feature_unique_key:customer_id
feature_partition_key:dt
all features:
   dt:STRING
   customer_id:INT
   age:INT
   diabetes:STRING
   ejection_fraction:STRING
   high_blood_pressure:STRING
   platelets:STRING
   sex:STRING
   smoking:STRING
   death_event:STRING
***********************************************

## 3.test feature group ingestion

In [5]:
   #test feature group ingestion
   source_feature_group_dataset = "s3://emrfssampledata/feature_store_customer_base.csv"
   emr_fg01.ingestion(source_feature_group_dataset,"overwrite")
   #source_feature_group_dataset = "s3://emrfssampledata/feature_store_customer_advance.csv"
   #emr_fg02.ingestion(source_feature_group_dataset,"overwrite")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

feature_partition_key:dt
feature_unique_key:customer_id
{'hoodie.datasource.write.precombine.field': 'dt', 'hoodie.datasource.write.recordkey.field': 'customer_id', 'hoodie.datasource.write.partitionpath.field': 'dt', 'hoodie.table.name': 'customer_base', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'customer_base', 'hoodie.datasource.hive_sync.database': 'emr_feature_store', 'hoodie.datasource.hive_sync.partition_fields': 'dt', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.mergeSchema': 'true'}
root
 |-- customer_id: integer (nullable = true)
 |-- city_code: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- dt: string (nullable = true)

None

## 4.test add new feature 

In [6]:
   #test add feature
   emr_fg01.add_feature("identify_code","string")
   update_feature_group_dataset = "s3://emrfssampledata/feature_store_customer_base_update.csv"
   emr_fg01.ingestion(update_feature_group_dataset,"append")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

sql===alter table emr_feature_store.customer_base add columns (identify_code string)
added new feature:identify_code string
feature_partition_key:dt
feature_unique_key:customer_id
{'hoodie.datasource.write.precombine.field': 'dt', 'hoodie.datasource.write.recordkey.field': 'customer_id', 'hoodie.datasource.write.partitionpath.field': 'dt', 'hoodie.table.name': 'customer_base', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'customer_base', 'hoodie.datasource.hive_sync.database': 'emr_feature_store', 'hoodie.datasource.hive_sync.partition_fields': 'dt', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.mergeSchema': 'true'}
root
 |-- customer_id: integer (nullable = true)
 |-- city_code: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- ide

## 5.test feature group filter & time travel & join query 

In [7]:
   #test query feature group
   #emr_fg01.select_all().show(0)
   #emr_fg01.select(["customer_id","city_code","state_code","identify_code"]).show(0)
   #test feature group time travel query
   emr_fg01.select_all().timeQuery("20211201000000","20220118000000").show(0)
   #test join
   #emr_fg01.select(["customer_id","city_code","state_code"]).join(emr_fg02.select(["age","diabetes"]),"customer_id").show(5)
   

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

full_query sql is:select emr_feature_store.customer_base.customer_id,emr_feature_store.customer_base.city_code,emr_feature_store.customer_base.state_code,emr_feature_store.customer_base.country_code,emr_feature_store.customer_base.dt,emr_feature_store.customer_base.identify_code from emr_feature_store.customer_base where  customer_base._hoodie_commit_time>='20211201000000' and customer_base._hoodie_commit_time <='20220118000000'
+-----------+---------+----------+------------+--------+-------------+
|customer_id|city_code|state_code|country_code|      dt|identify_code|
+-----------+---------+----------+------------+--------+-------------+
|     124013|        4|         5|           2|20210305|         null|
|     109382|        2|        40|           2|20210212|  51109873784|
|     109365|        2|         5|           5|20210311|         null|
|     109367|        2|         5|           3|20210211|  51109873794|
|     573291|        1|        49|           2|20210211|  51109873783|

## 6.test training dataset retriving

In [25]:
#test train dataset retrive
#emr_fg02.select_all().create_training_dataset(name = "userProfile dataset",\
#               data_format = "tfrecord",\
#               startDt="20211201000000",\
#               endDt= "20220116000000",\
#               outputLoc = "s3://emrfssampledata/traindataset/output")
emr_fg01.select(["customer_id","city_code","state_code","identify_code"]).join(emr_fg02.select(["age","diabetes"]),"customer_id")\
                 .create_training_dataset(name = "userProfile dataset",\
                                                data_format = "csv",\
                                                startDt="20211201000000",\
                                                endDt= "20220130000000",\
                                                outputLoc = "s3://emrfssampledata/traindataset/output")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

full_query sql is:select emr_feature_store.customer_base.customer_id,emr_feature_store.customer_base.city_code,emr_feature_store.customer_base.state_code,emr_feature_store.customer_base.identify_code,emr_feature_store.customer_advance.age,emr_feature_store.customer_advance.diabetes from emr_feature_store.customer_base left join emr_feature_store.customer_advance on emr_feature_store.customer_base.customer_id=emr_feature_store.customer_advance.customer_id where  customer_base._hoodie_commit_time>='20211201000000' and customer_base._hoodie_commit_time <='20220130000000'