# Incident prediction features Example

In [23]:
import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

{'FEAST_CORE_URL': 'core:6565',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT': 'parquet',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION': 'file:///shared/historical_feature_output',
 'FEAST_REDIS_HOST': 'redis',
 'FEAST_SERVING_URL': 'online_serving:6566',
 'FEAST_SPARK_HOME': '/usr/local/spark',
 'FEAST_SPARK_LAUNCHER': 'standalone',
 'FEAST_SPARK_STAGING_LOCATION': 'file:///shared/staging',
 'FEAST_SPARK_STANDALONE_MASTER': 'local'}


### Basic Imports and Feast Client initialization

In [24]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [25]:
client = Client()

### Taking indicident data as from batch source

Here data is treated as been taken from the batch source. We are using dataframe as source of data.

For both batch and stream sources, the following configurations are necessary:
Event timestamp column: Name of column containing timestamp when event data occurred. Used during point-in-time join of feature values to entity timestamps.
Created timestamp column: Name of column containing timestamp when data is created. Used to deduplicate data when multiple copies of the same entity key is ingested.

In [26]:
import numpy as np
import pandas as pd
from datetime import datetime

df = pd.read_csv("incident_clean.csv").drop(['Unnamed: 0'], axis=1)

old_column = df.columns
new_column = []

for c in old_column:
    c = c.replace(" ", "_").replace("(", "").replace(")", "")
    new_column.append(c.lower())


df.columns = new_column

df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=len(df)),
        unit="s"
    )
df['created'] = pd.to_datetime(datetime.now())

df = df[["incident_id","ci_name_aff","ci_type_aff","service_component_wbs_aff","datetime","created"]]
df

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,incident_id,ci_name_aff,ci_type_aff,service_component_wbs_aff,datetime,created
0,IM0000004,SUB000508,subapplication,WBS000162,2020-10-18 20:01:34,2020-12-02 12:57:22.406940
1,IM0000005,WBA000124,application,WBS000088,2020-10-10 03:28:56,2020-12-02 12:57:22.406940
2,IM0000006,DTA000024,application,WBS000092,2020-10-15 11:28:52,2020-12-02 12:57:22.406940
3,IM0000011,WBA000124,application,WBS000088,2020-10-14 05:03:00,2020-12-02 12:57:22.406940
4,IM0000012,WBA000124,application,WBS000088,2020-10-10 05:15:30,2020-12-02 12:57:22.406940
...,...,...,...,...,...,...
46601,IM0047053,SBA000464,application,WBS000073,2020-10-12 22:27:52,2020-12-02 12:57:22.406940
46602,IM0047054,SBA000461,application,WBS000073,2020-10-11 14:31:06,2020-12-02 12:57:22.406940
46603,IM0047055,LAP000019,computer,WBS000091,2020-10-13 18:03:30,2020-12-02 12:57:22.406940
46604,IM0047056,WBA000058,application,WBS000073,2020-10-16 06:41:05,2020-12-02 12:57:22.406940


# Creating Entity

In [27]:
incident_id = Entity(name="incident_id", description="incident identifier", value_type=ValueType.STRING)

# Creating features

In [28]:
# Daily updated features 
ci_name = Feature("ci_name_aff", ValueType.STRING)
ci_type = Feature("ci_type_aff", ValueType.STRING)
service_component = Feature("service_component_wbs_aff", ValueType.STRING)

# Sources

In [29]:
import os
demo_data_location = os.path.join(os.getenv("FEAST_SPARK_STAGING_LOCATION", "file:///home/jovyan/"), "test_data")

In [30]:
incident_prediction_source_uri = os.path.join(demo_data_location, "incident_prediction")

# Feature table 

In [31]:
incident_prediction_FT = FeatureTable(
    name = "incident_prediction",
    entities = ["incident_id"],
    features = [
        ci_name,
        ci_type,
        service_component
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=incident_prediction_source_uri,
        date_partition_column="date"
    )
)

# registring feature table and entity in feast client

In [32]:
client.apply_entity(incident_id)
client.apply_feature_table(incident_prediction_FT)

In [33]:
print(client.get_feature_table("incident_prediction").to_yaml())

spec:
  name: incident_prediction
  entities:
  - incident_id
  features:
  - name: service_component_wbs_aff
    valueType: STRING
  - name: ci_type_aff
    valueType: STRING
  - name: ci_name_aff
    valueType: STRING
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///shared/staging/test_data/incident_prediction
meta:
  createdTimestamp: '2020-11-30T11:32:50Z'



# Ingesting data in feast batch supported database . 

In [34]:
client.ingest(incident_prediction_FT, df)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


In [35]:
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [36]:
def read_parquet(uri):
    parsed_uri = urlparse(uri)
    if parsed_uri.scheme == "file":
        return pd.read_parquet(parsed_uri.path)
    elif parsed_uri.scheme == "gs":
        fs = gcsfs.GCSFileSystem()
        files = ["gs://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 's3':
        import s3fs
        fs = s3fs.S3FileSystem()
        files = ["s3://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    else:
        raise ValueError(f"Unsupported URL scheme {uri}")

# Getting training features

# Entity DataFrame

In [37]:
entities_with_timestamp = df[["incident_id"]]
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2020, 10, 18).timestamp(),
    datetime(2020, 10, 20).timestamp(),
    size=len(df)), unit='s')
entities_with_timestamp

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """


Unnamed: 0,incident_id,event_timestamp
0,IM0000004,2020-10-18 05:58:58
1,IM0000005,2020-10-19 08:19:19
2,IM0000006,2020-10-18 13:01:03
3,IM0000011,2020-10-18 02:40:23
4,IM0000012,2020-10-19 04:37:58
...,...,...
46601,IM0047053,2020-10-19 19:30:40
46602,IM0047054,2020-10-18 13:05:46
46603,IM0047055,2020-10-19 12:09:01
46604,IM0047056,2020-10-18 09:45:35


In [38]:
job = client.get_historical_features(
    feature_refs=[
        "incident_prediction:ci_name_aff",
        "incident_prediction:ci_type_aff",
        "incident_prediction:service_component_wbs_aff"
    ], 
    entity_source=entities_with_timestamp
)

In [39]:
output_file_uri = job.get_output_file_uri()

In [40]:
read_parquet(output_file_uri)

Unnamed: 0,incident_id,event_timestamp,incident_prediction__service_component_wbs_aff,incident_prediction__ci_type_aff,incident_prediction__ci_name_aff
0,IM0000035,2020-10-18 16:46:57,WBS000088,application,WBA000124
1,IM0000420,2020-10-19 07:01:53,WBS000014,application,SBA000172
2,IM0000432,2020-10-19 05:28:59,WBS000072,application,SBA000263
3,IM0000679,2020-10-18 16:16:59,WBS000072,application,SBA000263
4,IM0000757,2020-10-18 16:30:30,WBS000072,application,SBA000263
...,...,...,...,...,...
46601,IM0046584,2020-10-18 17:32:16,WBS000073,application,SBA000462
46602,IM0046698,2020-10-18 19:11:32,WBS000017,application,SBA000073
46603,IM0046899,2020-10-18 16:47:43,WBS000072,application,SBA000263
46604,IM0046900,2020-10-19 06:51:37,WBS000048,application,SBA000317


In [41]:
entities_with_timestamp = df[["incident_id"]].iloc[:10]
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2020, 10, 18).timestamp(),
    datetime(2020, 10, 20).timestamp(),
    size=10), unit='s')

job = client.get_historical_features(
    feature_refs=[
        "incident_prediction:ci_name_aff",
        "incident_prediction:ci_type_aff",
        "incident_prediction:service_component_wbs_aff"
    ], 
    entity_source=entities_with_timestamp
)

In [42]:
output_file_uri = job.get_output_file_uri()

In [43]:
read_parquet(output_file_uri)

Unnamed: 0,incident_id,event_timestamp,incident_prediction__service_component_wbs_aff,incident_prediction__ci_type_aff,incident_prediction__ci_name_aff
0,IM0000012,2020-10-19 04:06:21,WBS000088,application,WBA000124
1,IM0000013,2020-10-19 14:55:23,WBS000088,application,WBA000124
2,IM0000006,2020-10-18 23:46:51,WBS000092,application,DTA000024
3,IM0000018,2020-10-19 01:59:47,WBS000055,application,WBA000082
4,IM0000011,2020-10-19 23:35:45,WBS000088,application,WBA000124
5,IM0000005,2020-10-18 17:56:19,WBS000088,application,WBA000124
6,IM0000014,2020-10-18 17:53:20,WBS000055,application,WBA000082
7,IM0000015,2020-10-19 07:37:26,WBS000088,application,WBA000124
8,IM0000004,2020-10-18 19:28:26,WBS000162,subapplication,SUB000508
9,IM0000017,2020-10-19 03:44:43,WBS000088,application,WBA000124
