# Feature Engineering/Ingestion

* https://examples.hopsworks.ai/featurestore/hsfs/basics/feature_engineering/
    
![](https://examples.hopsworks.ai/featurestore/hsfs/images/overview.svg)    

In [1]:
import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
4,application_1619542175403_0008,pyspark,idle,Link,Link


SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

In [2]:
from hops import hdfs
from pyspark.sql import functions as F

stores_csv = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .format("csv")\
             .load("hdfs:///Projects/{}/Jupyter/hsfs/archive/stores data-set.csv".format(hdfs.project_name()))

exogenous_csv = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .format("csv")\
             .load("hdfs:///Projects/{}/Jupyter/hsfs/archive/Features data set.csv".format(hdfs.project_name()))

sales_csv = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .format("csv")\
             .load("hdfs:///Projects/{}/Jupyter/hsfs/archive/sales data-set.csv".format(hdfs.project_name()))

In [3]:
stores_depts_count = stores_csv\
                    .join(sales_csv, "store")\
                    .groupBy("store")\
                    .agg(F.countDistinct("dept"))\
                    .withColumnRenamed("count(DISTINCT dept)", "num_depts")

stores_fg = stores_csv\
            .join(stores_depts_count, "store")

In [4]:
store_fg_meta = fs.create_feature_group(name="store_fg_abobora_quadrada",
                                       version=1,
                                       primary_key=['store'],
                                       description="Store related features",
                                       time_travel_format=None,
                                       statistics_config={"enabled": True, "histograms": True, "correlations": True})

In [5]:
store_fg_meta.save(stores_fg)

<hsfs.feature_group.FeatureGroup object at 0x7f22928dddd0>

In [6]:
from pyspark.sql import Window
days = lambda i: i * 86400 

sales_df = sales_csv.withColumn('date', F.to_date("date", 'dd/MM/yyy'))\
                    .withColumn('timestamp', F.unix_timestamp("date"))

# Define aggregation window to compute sales performances over the past period of time
last_month_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-30), days(-1))
last_quarter_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-90), days(-1))
last_six_month_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-180), days(-1))
last_year_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-365), days(-1))

last_month_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-30), days(-1))
last_quarter_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-90), days(-1))
last_six_month_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-180), days(-1))
last_year_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-365), days(-1))

# Build feature group dataframe
sales_fg = sales_df.withColumn("sales_last_month_store_dep", F.sum("weekly_sales").over(last_month_window_store_dep))\
        .withColumn("sales_last_quarter_store_dep", F.sum("weekly_sales").over(last_quarter_window_store_dep))\
        .withColumn("sales_last_six_month_store_dep", F.sum("weekly_sales").over(last_six_month_window_store_dep))\
        .withColumn("sales_last_year_store_dep", F.sum("weekly_sales").over(last_year_window_store_dep))\
        .withColumn("sales_last_month_store", F.sum("weekly_sales").over(last_month_window_store))\
        .withColumn("sales_last_quarter_store", F.sum("weekly_sales").over(last_quarter_window_store))\
        .withColumn("sales_last_six_month_store", F.sum("weekly_sales").over(last_six_month_window_store))\
        .withColumn("sales_last_year_store", F.sum("weekly_sales").over(last_year_window_store))\
        .drop("timestamp")\
        .fillna(0)

In [7]:
sales_fg_meta = fs.create_feature_group(name="sales_fg",
                                        version=1,
                                        primary_key=['store', 'dept', 'date'],
                                        description="Sales related features",
                                        time_travel_format=None,                                        
                                        statistics_config=False)

In [8]:
sales_fg_meta.save(sales_fg)


<hsfs.feature_group.FeatureGroup object at 0x7ff08e09fa50>

In [9]:
sales_part_fg_meta = fs.create_feature_group(name="sales_fg",
                                        version=2,
                                        partition_key=['store'],
                                        description="Sales related features",
                                        time_travel_format=None,                                                                                          
                                        statistics_config=False)
sales_part_fg_meta.save(sales_fg)

<hsfs.feature_group.FeatureGroup object at 0x7ff08e7de910>

In [10]:
sales_part_fg_meta = fs.create_feature_group(name="sales_fg",
                                        version=3,
                                        primary_key=['store', 'dept', 'date'],
                                        online_enabled=True,
                                        description="Sales related features",
                                        time_travel_format=None,                                             
                                        statistics_config=False)
sales_part_fg_meta.save(sales_fg)

<hsfs.feature_group.FeatureGroup object at 0x7ff08e0a23d0>

In [11]:
exogenous_fg = exogenous_csv.withColumn('date', F.to_date("date", 'dd/MM/yyy'))

exogenous_fg_meta = fs.create_feature_group(name="exogenous_fg",
                                            version=1,
                                            primary_key=['store', 'date'],
                                            description="External features that influence sales, but are not under the control of the distribution chain",
                                            time_travel_format=None,                                            
                                            statistics_config={"enabled": True, "histograms": True, "correlations": True})
exogenous_fg_meta.save(exogenous_fg)

<hsfs.feature_group.FeatureGroup object at 0x7ff08e0587d0>

In [12]:
exogenous_fg_2013 = exogenous_fg.withColumn('date', F.date_add('date', 365))

In [13]:
exogenous_fg_meta = fs.get_feature_group('exogenous_fg', 1)
exogenous_fg_meta.insert(exogenous_fg_2013)

In [14]:
from hsfs.feature import Feature


In [15]:
exogenous_fg_meta.append_features([Feature("appended_feature", type="double", default_value="10.0")])


<hsfs.feature_group.FeatureGroup object at 0x7ff08e05aa10>

In [18]:
exogenous_fg_meta = fs.create_feature_group(name="exogenous_fg_2_ave_maria",
                                        version=2,
                                        primary_key=['store', 'date'],
                                        description="External features that influence sales, but are not under the control of the distribution chain",
                                        time_travel_format=None,                                                                                        
                                        statistics_config=False)
exogenous_fg_meta.save(exogenous_fg)

<hsfs.feature_group.FeatureGroup object at 0x7ff07a6ef550>

In [19]:
exogenous_fg_meta = fs.get_feature_group('exogenous_fg_2_ave_maria', 2)
exogenous_fg_meta.delete()