In [14]:
import numpy as np

np.int = int
np.float = float
np.bool = bool

import hsfs

connection = hsfs.connection(
    host="c.app.hopsworks.ai",
    project="tynmarket",
    api_key_value=""
)

Connected. Call `.close()` to terminate connection gracefully.


In [15]:
fs = connection.get_feature_store() 

In [5]:
import datetime
import pandas as pd

day_trading_features = pd.read_csv("~/git/tmp/data/fifteen_percent_gains_features.csv")

print(day_trading_features.head())
print(day_trading_features.dtypes)


    close   volume                       date
0   99.98  93417.0  2020-01-02 14:30:00+00:00
1   99.78  16685.0  2020-01-02 14:31:00+00:00
2  100.14  21998.0  2020-01-02 14:32:00+00:00
3  100.35  18348.0  2020-01-02 14:33:00+00:00
4  100.55  22181.0  2020-01-02 14:34:00+00:00
close     float64
volume    float64
date       object
dtype: object


In [6]:
# Ticker symbol
day_trading_features['symbol'] = 'TWLO'

# 文字列をdatetimeに変換
day_trading_features['date'] = day_trading_features['date'].apply(lambda x:datetime.datetime.strptime(x[:-6], '%Y-%m-%d %H:%M:%S'))
# dateをインデックスに設定
day_trading_features.set_index('date', inplace=True)
# タイムゾーンをUS/Pacificに設定
day_trading_features = day_trading_features.tz_localize('US/Pacific')
# dateをint型に変換
day_trading_features['date'] = day_trading_features.index.astype(int)

day_trading_features.head()

Unnamed: 0_level_0,close,volume,symbol,date
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-01-02 14:30:00-08:00,99.98,93417.0,TWLO,1578004200000000000
2020-01-02 14:31:00-08:00,99.78,16685.0,TWLO,1578004260000000000
2020-01-02 14:32:00-08:00,100.14,21998.0,TWLO,1578004320000000000
2020-01-02 14:33:00-08:00,100.35,18348.0,TWLO,1578004380000000000
2020-01-02 14:34:00-08:00,100.55,22181.0,TWLO,1578004440000000000


In [7]:
tweet_df = pd.read_csv("~/git/tmp/data/twlo_tweets.csv", encoding='ISO-8859-1')

tweet_df.index = pd.to_datetime(tweet_df['date_tweeted'], format='ISO8601')
tweet_df.index = tweet_df.index.tz_convert('US/Pacific')
del tweet_df['date_tweeted']

tweet_df.sort_index(inplace=True)
tweet_df.dropna(inplace=True)

rolling_7_day_total_tweets = tweet_df.resample('1T')['tweet_unique_id'].count().rolling('7D').sum()
rolling_1_day_verified_count = tweet_df.resample('1T')['author_verified'].sum().rolling('1D').sum()

twitter_stats = pd.DataFrame({
    'feature__rolling_7_day_total_tweets': rolling_7_day_total_tweets,
    'feature__rolling_1_day_verified_count': rolling_1_day_verified_count
})

twitter_stats.index = pd.to_datetime(twitter_stats.index)
twitter_stats.index = twitter_stats.index.tz_convert('US/Pacific')

day_trading_features = day_trading_features.merge(twitter_stats, left_index=True, right_index=True)

In [14]:
twitter_fg = fs.create_feature_group(name="twitter_features",  # 特徴量グループの名前
                                    primary_key=["symbol"],  # データ内でエンティティを識別するキー
                                    event_time = "date",  # どのデータがより新しいかを伝える
                                    version=1,  # 特徴量のバージョン
                                    description="Twitter Features", # 特徴量グループの説明
                                    online_enabled=True) # オンラインモードを有効化する

# 特徴量を選択
twitter_features = day_trading_features[['symbol', 'date', 'feature__rolling_7_day_total_tweets', 'feature__rolling_1_day_verified_count']]

# 特徴量を保存
twitter_fg.save(twitter_features)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1016980/fs/1008707/fg/1169738


Uploading Dataframe: 0.00% |          | Rows 0/146502 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: twitter_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1016980/jobs/named/twitter_features_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x30bd7cc10>, None)

In [18]:
price_volume_fg = fs.create_feature_group(name="price_volume_features",  # 特徴量グループの名前
                                    primary_key=["symbol"],  # データ内でエンティティを識別するキー
                                    event_time = "date",  # どのデータがより新しいかを伝える
                                    version=1,  # 特徴量のバージョン
                                    description="Price and Volume Features", # 特徴量グループの説明
                                    online_enabled=True) # オンラインモードを有効化する

# price_volume_fg = fs.get_feature_group('price_volume_features', version=1)
# price_volume_fg.delete()

# Twitter以外の特徴量を選択、保存
price_volume_fg.save(day_trading_features.drop(['feature__rolling_7_day_total_tweets', 'feature__rolling_1_day_verified_count'], axis=1))

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1016980/fs/1008707/fg/1187159


Uploading Dataframe: 0.00% |          | Rows 0/146502 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: price_volume_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1016980/jobs/named/price_volume_features_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x307fb57e0>, None)

In [19]:
twitter_fg = fs.get_feature_group(name="twitter_features", version='1')
price_volume_fg = fs.get_feature_group(name="price_volume_features", version='1')

In [20]:
# 2つの特徴量グループを組み合わせて元のデータを復元
query = twitter_fg.select_all().join(price_volume_fg.select_all(), on=['date', 'symbol']) 

query.show(2)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (9.47s) 


Unnamed: 0,symbol,date,feature__rolling_7_day_total_tweets,feature__rolling_1_day_verified_count,close,volume
0,TWLO,1578004200000000000,215.0,4.0,99.98,93417.0
1,TWLO,1578004380000000000,215.0,4.0,100.35,18348.0
