In [4]:
import pandas as pd
import os
from sqlalchemy import create_engine
from concurrent.futures import ThreadPoolExecutor

from feast import FeatureStore
import mlflow
import mlflow.sklearn

from sklearn.preprocessing import OrdinalEncoder, RobustScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression, Ridge, Lasso, BayesianRidge, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split, GridSearchCV

from utils.model.feature_vars import listing_features, host_features, review_features, fact_features
from utils.model.feast import query_data, get_historical_features
from utils.model.checking import df_description
from utils.model.outliers import outliers_handling
from utils.model.encoding import data_encoding
from utils.mlflow.creating import create_mlflow_experiment

db_config = {
  'user': 'admin',
  'password': 'admin123',
  'host': 'feast_postgres',
  'port': '5432',
  'database': 'feast_postgres'
}

def data_extraction():
  fs = FeatureStore(repo_path="./feature_repo")

  connection_string = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}"
  engine = create_engine(connection_string)

  queries = {
    'listing': "SELECT id, event_timestamp FROM listing_table",
    'host': "SELECT host_id, event_timestamp FROM host_table",
    'review': "SELECT id, event_timestamp FROM review_table",
    'fact': "SELECT id, event_timestamp FROM fact_table"
  }

  with ThreadPoolExecutor() as executor:
    listing_data, host_data, review_data, fact_data = executor.map(lambda q: query_data(engine, q), queries.values())

  listing_df = get_historical_features(fs, listing_data, listing_features)
  host_df = get_historical_features(fs, host_data, host_features)
  review_df = get_historical_features(fs, review_data, review_features)
  fact_df = get_historical_features(fs, fact_data, fact_features)

  # Drop event_timestamp columns
  for df in [listing_df, host_df, review_df, fact_df]:
    df.drop(columns=['event_timestamp'], inplace=True)

  # Merge dataframes
  df = pd.merge(listing_df, host_df, on="host_id", how="left").drop_duplicates(subset=['id'])
  df = pd.merge(df, review_df, on="id", how="left").drop_duplicates(subset=['id'])
  df = pd.merge(df, fact_df, on="id", how="left").drop_duplicates(subset=['id'])

  print(f"Length listing_df {len(listing_df)}")
  print(f"Length host_df {len(host_df)}")
  print(f"Length review_df {len(review_df)}")
  print(f"Length fact_df {len(fact_df)}")
  print(f"Length df {len(df)}")

  df.to_csv("./data.csv", index=False)

  return df

def data_validation(df):
  report = {}

  missing_values_count = df.isnull().sum()
  vals = []

  print(f"Length of dataframe: {len(df)}\n")

  for col in df.columns:
    missing_count = missing_values_count[col]
    col_type = df[col].dtype
    vals.append(f"Column: {col}, Missing Values: {missing_count}, Type: {col_type}")

  vals = "\n".join(vals)
  print(vals)

  return df


def data_preparation(df):
  # Outliers handling
  df = outliers_handling(df)

  # Data encoding
  df = data_encoding(df)

  # Data splitting
  features = df.drop("price", axis=1)
  target = df["price"]
  X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.3, random_state=42)

  # Data standardizing
  scaler = RobustScaler()
  X_train_scaled = scaler.fit_transform(X_train)
  X_test_scaled = scaler.transform(X_test)

  return X_train_scaled, X_test_scaled, y_train, y_test

def model_training(X_train_scaled, y_train):
  models_and_params = {
    "Linear_Regression": LinearRegression(),
    "Ridge_Regression": Ridge(alpha=0.001),
    "Lasso_Regression": Lasso(alpha=0.0001),
    "Bayesian_Ridge Regression": BayesianRidge(alpha_1=1e-6, lambda_1=1e-6),
    "ElasticNet_Regression": ElasticNet(alpha=0.01, l1_ratio=0.2),
    "Decision_Tree_Regression": DecisionTreeRegressor(max_depth=3),
  }

  trained_models = {}
  for name, model in models_and_params.items():
    model.fit(X_train_scaled, y_train)
    trained_models[name] = model

  return trained_models

def model_scoring(trained_models, X_test_scaled, y_test):
  res = []
  for name, model in trained_models.items():
    predictions = model.predict(X_test_scaled)
    rmse = mean_squared_error(y_test, predictions, squared=False)
    r2 = r2_score(y_test, predictions)
    res.append((name, model, model.get_params(), rmse, r2))

  return res

def model_exporting(res):
    mlflow.set_tracking_uri("http://mlflow:5000")
    experiment_id = create_mlflow_experiment(
      experiment_name="dev_model5",
      artifact_location="s3://artifacts"
    )

    with mlflow.start_run(run_name="experiment", experiment_id=experiment_id) as run:
        for name, model, params, rmse, r2 in res:
            with mlflow.start_run(run_name=name, nested=True) as run: 
                mlflow.log_params(params)
                mlflow.log_metric("rmse", rmse)
                mlflow.log_metric("r2", r2)
                mlflow.sklearn.log_model(model, artifact_path=name)
        print("run_id: {}".format(run.info.run_id))

if __name__ == "__main__":
  # df = data_extraction()
  df = pd.read_csv("./data.csv")
  df = data_validation(df)
  X_train_scaled, X_test_scaled, y_train, y_test = data_preparation(df)
  trained_models = model_training(X_train_scaled, y_train)
  res = model_scoring(trained_models, X_test_scaled, y_test)
  
  model_exporting(res)

Length of dataframe: 260369

Column: id, Missing Values: 0, Type: int64
Column: host_id, Missing Values: 0, Type: int64
Column: property_type, Missing Values: 0, Type: object
Column: room_type, Missing Values: 0, Type: object
Column: accommodates, Missing Values: 0, Type: float64
Column: bathrooms, Missing Values: 0, Type: float64
Column: bedrooms, Missing Values: 0, Type: float64
Column: beds, Missing Values: 0, Type: float64
Column: price, Missing Values: 0, Type: float64
Column: has_availability, Missing Values: 0, Type: object
Column: availability_30, Missing Values: 0, Type: float64
Column: availability_60, Missing Values: 0, Type: float64
Column: availability_90, Missing Values: 0, Type: float64
Column: availability_365, Missing Values: 0, Type: float64
Column: instant_bookable, Missing Values: 0, Type: object
Column: host_response_time, Missing Values: 0, Type: object
Column: host_response_rate, Missing Values: 0, Type: float64
Column: host_acceptance_rate, Missing Values: 0, Ty



Experiment dev_model5 already exists.


  return _bootstrap._gcd_import(name[level:], package, level)


run_id: c182879fc05349fe8a8a716bf137b29f


In [9]:
model_uri = "mlflow-artifacts:/66/c182879fc05349fe8a8a716bf137b29f/artifacts/Decision_Tree_Regression"
DTG = mlflow.sklearn.load_model(model_uri=model_uri)
print(DTG)

# Make predictions on the old data
predictions = DTG.predict(X_test_scaled)

# Evaluate predictions if actual values are available
mse = mean_squared_error(y_test, predictions)
r2 = r2_score(y_test, predictions)

print(f"Mean Squared Error: {mse}")
print(f"R-squared: {r2}")

print(predictions)

Downloading artifacts: 100%|██████████| 9/9 [00:00<00:00, 178.36it/s]

DecisionTreeRegressor(max_depth=3)
Mean Squared Error: 0.06186771565588156
R-squared: 0.43592440245415187
[2.13300986 2.26056703 2.13300986 ... 2.13300986 2.1445345  2.74485902]





In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.sklearn
from typing import List
import numpy as np
import pandas as pd

app = FastAPI()

# Load the trained model
model_uri = "mlflow-artifacts:/66/c182879fc05349fe8a8a716bf137b29f/artifacts/Decision_Tree_Regression"
DTG = mlflow.sklearn.load_model(model_uri=model_uri)

class PredictionRequest(BaseModel):
    data: List[List[float]]

class PredictionResponse(BaseModel):
    predictions: List[float]
    mse: float = None
    r2: float = None

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    try:
        # Convert the input data to a DataFrame
        input_data = pd.DataFrame(request.data)
        
        # Assume X_test_scaled is preprocessed similarly
        # This part may vary depending on how you scale your input data
        X_test_scaled = input_data  # Adjust this line as needed

        # Make predictions
        predictions = DTG.predict(X_test_scaled)

        return PredictionResponse(
            predictions=predictions.tolist()
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000, reload=True)


In [None]:
!uvicorn main:app --reload