## <span style="color:#ff5f27;"> 💽 Loading the Data </span>

In [1]:
#!pip install -U hopsworks --quiet

In [2]:
import numpy as np
import pandas as pd
import gdown

In [3]:
# Link to the dataset
url = 'https://drive.google.com/uc?id=1bxaIuvmGoCD8mOnlVJdATP0zvYlc_8e5'

# Output filename
output = 'ratebeer.csv'

# Download the file from the Google Drive link
gdown.download(url, output, quiet=False)

# Load the dataset into a pandas DataFrame
data = pd.read_csv(output)

Downloading...
From (original): https://drive.google.com/uc?id=1bxaIuvmGoCD8mOnlVJdATP0zvYlc_8e5
From (redirected): https://drive.google.com/uc?id=1bxaIuvmGoCD8mOnlVJdATP0zvYlc_8e5&confirm=t&uuid=6d78289b-0995-4d0a-b7c0-fc85a344ac89
To: /Users/sebatiankristensen/Desktop/Data-Engineering-MLOps-Exam-Assignment-1/Data-Engineering-MLOps-Exam-Assignment/ratebeer.csv
100%|██████████| 1.13G/1.13G [00:50<00:00, 22.3MB/s]
  data = pd.read_csv(output)


In [4]:
# Check the first few rows of the dataframe
data.head()

Unnamed: 0,beer_name,beer_beerid,beer_brewerid,beer_abv,beer_style,review_appearance,review_aroma,review_palate,review_taste,review_overall,review_time,review_profilename,review_text
0,John Harvards Simcoe IPA,63836,8481,5.4,India Pale Ale (IPA),8.0,6.0,6.0,6.0,6.5,1157587200,hopdog,"On tap at the Springfield, PA location. Poured..."
1,John Harvards Simcoe IPA,63836,8481,5.4,India Pale Ale (IPA),8.0,6.0,8.0,7.0,6.5,1157241600,TomDecapolis,On tap at the John Harvards in Springfield PA....
2,John Harvards Cristal Pilsner,71716,8481,5.0,Bohemian Pilsener,8.0,5.0,6.0,6.0,7.0,958694400,PhillyBeer2112,"UPDATED: FEB 19, 2003 Springfield, PA. I've ne..."
3,John Harvards Fancy Lawnmower Beer,64125,8481,5.4,Kölsch,4.0,4.0,4.0,4.0,4.0,1157587200,TomDecapolis,On tap the Springfield PA location billed as t...
4,John Harvards Fancy Lawnmower Beer,64125,8481,5.4,Kölsch,4.0,4.0,4.0,4.0,4.0,1157587200,hopdog,"On tap at the Springfield, PA location. Poured..."


## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>

In [5]:
# Converting the timestamp to datetime and updating the review_time column
data['review_time'] = pd.to_datetime(data['review_time'], unit='s')

In [6]:
# Extracting the year, month, and day of the week from the datetime
data['year'] = data['review_time'].dt.year
data['month'] = data['review_time'].dt.month
data['day_of_week'] = data['review_time'].dt.dayofweek  # Monday=0, Sunday=6

In [7]:
# aggregating the reviews
agg_reviews = data.groupby('beer_beerid').agg({
    'review_overall': ['mean', 'count'],
    'review_aroma': 'mean',
    'review_palate': 'mean',
    'review_taste': 'mean',
    'review_appearance': 'mean'
}).reset_index()
agg_reviews.columns = ['beer_beerid', 'avg_overall', 'review_count', 'avg_aroma', 'avg_palate', 'avg_taste', 'avg_appearance']
agg_reviews

Unnamed: 0,beer_beerid,avg_overall,review_count,avg_aroma,avg_palate,avg_taste,avg_appearance
0,13,6.886114,821,6.371498,6.730816,6.651644,7.049939
1,14,6.704887,266,6.259398,6.443609,6.383459,6.563910
2,15,7.215976,169,6.739645,6.970414,6.905325,7.337278
3,16,7.953747,1081,7.716004,7.574468,7.666050,7.922294
4,17,7.033994,353,6.691218,6.691218,6.781870,7.359773
...,...,...,...,...,...,...,...
87505,broyarde-l`eclipse-80766,6.333333,3,6.666667,6.000000,6.666667,7.333333
87506,broyarde-l`harfang-80769,5.250000,6,5.666667,5.000000,5.833333,5.666667
87507,la-saint-pierre-blonde-de-l`oncle-hansi-91690,5.666667,15,5.933333,5.733333,6.000000,5.866667
87508,s`tunnel-45990,6.818182,11,6.454545,6.909091,6.454545,7.636364


In [8]:
# Group data by reviewer names, count their reviews, and reset index to form a DataFrame with 'review_profilename' and 'review_count' columns.
reviewer_metrics = data.groupby('review_profilename').size().reset_index(name='review_count')
reviewer_metrics

Unnamed: 0,review_profilename,review_count
0,-BB-99,2
1,000pete1983,1
2,007BeerDrinker,1
3,007Lager,1
4,007lund,85
...,...,...
28681,zygomatic99,12
28682,zyster99,16
28683,zywiecporter,9
28684,zziemelis,5


In [9]:
import pickle

# Generating a pickle
with open('ratebeer.pkl', 'wb') as file:
    pickle.dump(data, file)

## <span style="color:#ff5f27;"> 🪄 Creating Feature Groups in Hopsworks </span>

### Setting Up Hopsworks

In [10]:
# intizialize Hopworks
import hopsworks as hs
project = hs.login()
fs = project.get_feature_store()

  from .autonotebook import tqdm as notebook_tqdm


Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/550038
Connected. Call `.close()` to terminate connection gracefully.


### Preproccessing for Hopsworks

In [11]:
import re

# Create a mapping dictionary to replace "/" with "_" for hopsworks compatibility
column_mapping = {col: col.replace("/", "_") for col in data.columns}

# Rename the columns using the mapping dictionary
data.rename(columns=column_mapping, inplace=True)

# Convert all feature names to lowercase
data.columns = data.columns.str.lower()

# Replace spaces with underscores
data.columns = data.columns.str.replace(' ', '_')

# Rename the columns
data.rename(columns=lambda x: re.sub(r'[^a-zA-Z0-9]', '_', x), inplace=True)

# Assuming beer_features is your DataFrame containing the features

# Replace invalid characters with underscores
data.columns = data.columns.str.replace('[^a-zA-Z0-9_]', '_')

# Ensure feature names length does not exceed 63 characters
data.columns = [col[:63] for col in data.columns]

### Beer Features

In [12]:
# Collect columns for beer features, including one-hot encoded styles
beer_columns = ['beer_beerid', 'beer_name', 'beer_brewerid', 'beer_abv', 'review_profilename', 'review_time'] + [col for col in data.columns if col.startswith('style_')]
beer_features = data[beer_columns].drop_duplicates(subset=['beer_beerid']).dropna(subset=['beer_beerid'])


In [13]:
# Convert data types immediately after defining beer_columns
data['beer_beerid'] = pd.to_numeric(data['beer_beerid'], errors='coerce').astype('Int64')
data['beer_brewerid'] = pd.to_numeric(data['beer_brewerid'], errors='coerce').astype('Int64')
data['beer_abv'] = pd.to_numeric(data['beer_abv'], errors='coerce').astype(float)

# Now collect the columns
beer_features = data[beer_columns].drop_duplicates(subset=['beer_beerid']).dropna(subset=['beer_beerid'])

In [14]:
# Feature group for Beers
beer_fg = fs.get_or_create_feature_group(
    name="beer_features",
    version=1,
    description="Basic information about beers, including one-hot encoded styles",
    primary_key=['beer_beerid'],
    event_time="review_time",
    online_enabled=True
)


In [15]:
try:
    beer_fg.insert(beer_features, write_options={"wait_for_job" : False})
    print("Data insertion initiated successfully.")
except Exception as e:
    print("Failed to insert data:", e)


Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/550038/fs/545861/fg/787861


Uploading Dataframe: 100.00% |██████████| Rows 87199/87199 | Elapsed Time: 00:12 | Remaining Time: 00:00


Launching job: beer_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/550038/jobs/named/beer_features_1_offline_fg_materialization/executions
Data insertion initiated successfully.


In [16]:
# update beer feature descriptions
beer_feature_descriptions = {
    "beer_beerid": "Unique identifier for each beer.",
    "beer_name": "Name of the beer.",
    "beer_brewerid": "Identifier for the brewer of the beer.",
    "beer_abv": "Alcohol by volume percentage of the beer.",
}

# Update descriptions for Beer Features
for feature, description in beer_feature_descriptions.items():
    beer_fg.update_feature_description(feature, description)

### Review Features

In [17]:
# Feature Group for Reviews
review_columns = ['review_appearance', 'review_aroma', 'review_palate', 'review_taste', 'review_overall', 'beer_beerid', 'review_profilename', 'review_time']
review_features = data[review_columns].drop_duplicates().dropna(subset=['beer_beerid'])

In [18]:
# Convert data types
review_features['beer_beerid'] = pd.to_numeric(review_features['beer_beerid'], errors='coerce').astype('Int64')
review_features['review_time'] = pd.to_datetime(review_features['review_time'], unit='s')

In [19]:
# Drop rows with missing values again after converting data types
review_features = review_features.dropna(subset=['beer_beerid'])

In [20]:
review_fg = fs.get_or_create_feature_group(
    name="review_features",
    version=1,
    description="Metrics about beer reviews, including user profiles",
    primary_key=['beer_beerid', 'review_profilename'],
    event_time="review_time",
    online_enabled=True
)

In [21]:
try:
    review_fg.insert(review_features, write_options={"wait_for_job" : False})
    print("Data insertion initiated successfully.")
except Exception as e:
    print("Failed to insert data:", e)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/550038/fs/545861/fg/787862


Uploading Dataframe: 100.00% |██████████| Rows 2715055/2715055 | Elapsed Time: 03:35 | Remaining Time: 00:00


Launching job: review_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/550038/jobs/named/review_features_1_offline_fg_materialization/executions
Data insertion initiated successfully.


In [22]:
# update review feature descriptions
review_feature_descriptions = {
    "review_appearance": "Rating of the beer's appearance (1-10).",
    "review_aroma": "Rating of the beer's aroma (1-10).",
    "review_palate": "Rating of the beer's palate (1-10).",
    "review_taste": "Rating of the beer's taste (1-10).",
    "review_overall": "Overall rating of the beer (1-10).",
    "beer_beerid": "Associated unique identifier for each beer.",
    "review_profilename": "Username of the reviewer.",
    "review_time": "Timestamp of the review."
}

# Update descriptions for Review Features
for feature, description in review_feature_descriptions.items():
    review_fg.update_feature_description(feature, description)

### Aggregated Beer Reviews Features

In [23]:
agg_reviews['beer_beerid'] = pd.to_numeric(agg_reviews['beer_beerid'], errors='coerce').astype('Int64')

In [24]:
# Ensure all relevant fields are integers or floats as needed
agg_reviews['beer_beerid'] = pd.to_numeric(agg_reviews['beer_beerid'], errors='coerce').astype('Int64')
agg_reviews['avg_overall'] = pd.to_numeric(agg_reviews['avg_overall'], errors='coerce').astype(float)
agg_reviews['avg_aroma'] = pd.to_numeric(agg_reviews['avg_aroma'], errors='coerce').astype(float)
agg_reviews['avg_palate'] = pd.to_numeric(agg_reviews['avg_palate'], errors='coerce').astype(float)
agg_reviews['avg_taste'] = pd.to_numeric(agg_reviews['avg_taste'], errors='coerce').astype(float)
agg_reviews['avg_appearance'] = pd.to_numeric(agg_reviews['avg_appearance'], errors='coerce').astype(float)

In [25]:
# Drop rows where the primary key is null if any
agg_reviews = agg_reviews.dropna(subset=['beer_beerid'])

In [26]:
# Create the feature group for aggregated beer reviews
agg_reviews_fg = fs.get_or_create_feature_group(
    name="agg_reviews",
    version=1,
    description="Aggregated review metrics for each beer",
    primary_key=['beer_beerid'],
    online_enabled=True
)

In [27]:
try:
    agg_reviews_fg.insert(agg_reviews, write_options={"wait_for_job" : False})
    print("Data insertion initiated successfully.")
except Exception as e:
    print("Failed to insert data:", e)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/550038/fs/545861/fg/787863


Uploading Dataframe: 100.00% |██████████| Rows 87505/87505 | Elapsed Time: 00:11 | Remaining Time: 00:00


Launching job: agg_reviews_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/550038/jobs/named/agg_reviews_1_offline_fg_materialization/executions
Data insertion initiated successfully.


In [28]:
# Update feature descriptions
agg_reviews_feature_descriptions = {
    "beer_beerid": "Unique identifier for each beer.",
    "avg_overall": "Average overall rating of the beer.",
    "review_count": "Total number of reviews for the beer.",
    "avg_aroma": "Average aroma rating for the beer.",
    "avg_palate": "Average palate rating for the beer.",
    "avg_taste": "Average taste rating for the beer.",
    "avg_appearance": "Average appearance rating for the beer."
}

for feature, description in agg_reviews_feature_descriptions.items():
    agg_reviews_fg.update_feature_description(feature, description)

### Reviewer Metrics Features

In [29]:
# Create the feature group for reviewer metrics
reviewer_metrics_fg = fs.get_or_create_feature_group(
    name="reviewer_metrics",
    version=1,
    description="Count of reviews submitted by each reviewer",
    primary_key=['review_profilename'],
    online_enabled=True
)

# Insert the data into the feature group
reviewer_metrics_fg.insert(reviewer_metrics)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/550038/fs/545861/fg/787864


Uploading Dataframe: 100.00% |██████████| Rows 28686/28686 | Elapsed Time: 00:07 | Remaining Time: 00:00


Launching job: reviewer_metrics_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/550038/jobs/named/reviewer_metrics_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x3902b1250>, None)

In [30]:
# Update feature descriptions
reviewer_metrics_feature_descriptions = {
    "review_profilename": "Username of the reviewer.",
    "review_count": "Total number of reviews submitted by the reviewer."
}

for feature, description in reviewer_metrics_feature_descriptions.items():
    reviewer_metrics_fg.update_feature_description(feature, description)