In [0]:
configs = {
    "fs.azure.account.key.datalaketfexample.blob.core.windows.net": "7bZ7+qULn3sdaGQXqfJzzohWqqy172fopVsPA7X341sr31rdSUnUqPQrIN3aPz9Xi/U9Z/2Z/alu+AStkU42pg=="
}

dbutils.fs.mount(
    source="wasbs://bronze@datalaketfexample.blob.core.windows.net/",
    mount_point="/mnt/bronze",
    extra_configs=configs
)
dbutils.fs.mount(
    source="wasbs://silver@datalaketfexample.blob.core.windows.net/",
    mount_point="/mnt/silver",
    extra_configs=configs
)


Out[1]: True

In [0]:
from pyspark.sql import SparkSession
import pandas as pd
bronze_path = "/mnt/bronze/"
bronze_data = spark.read.csv(bronze_path, header=True, inferSchema=True)



In [0]:
# List all CSV files in the Bronze folder
# (This assumes you have a way to list files. For simplicity, listing them from a local file system)
file_list = dbutils.fs.ls(bronze_path)  # Use dbutils to list files in the mounted ADLS directory

# Collect file paths
file_paths = [file.path for file in file_list if file.path.endswith('.csv')]

# Initialize an empty list to hold DataFrames
df_list = []

# Iterate through each file and read into Pandas
for file_path in file_paths:
    # Read each file into a Pandas DataFrame
    temp_df = spark.read.csv(file_path, header=True, inferSchema=True).toPandas()
    df_list.append(temp_df)

# Concatenate all DataFrames into one
bronze_df = pd.concat(df_list, axis=0, ignore_index=True)

In [0]:
# Create new features
bronze_df['MDC_PULS_OXIM_PULS_RATE_Result_min'] = bronze_df['MDC_PULS_OXIM_PULS_RATE_Result'].rolling(window=2).min()
bronze_df['MDC_TEMP_Result_mean'] = bronze_df['MDC_TEMP_Result'].rolling(window=2).mean()
bronze_df['MDC_PULS_OXIM_PULS_RATE_Result_mean'] = bronze_df['MDC_PULS_OXIM_PULS_RATE_Result'].rolling(window=2).mean()
bronze_df['HR_to_RR_Ratio'] = bronze_df['MDC_ECG_HEART_RATE_Result'] / bronze_df['MDC_TTHOR_RESP_RATE_Result']

# Drop rows with missing values due to rolling calculations
bronze_df = bronze_df.dropna().reset_index(drop=True)

# Convert back to Spark DataFrame for saving
silver_df = spark.createDataFrame(bronze_df)


  Expected bytes, got a 'int' object
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
# Select the required columns from bronze_df
silver_df = silver_df.select(
    "Result Time",
    "Patient ID",
    "Label",
    "MDC_PULS_OXIM_PULS_RATE_Result_min",
    "MDC_TEMP_Result_mean",
    "MDC_PULS_OXIM_PULS_RATE_Result_mean",
    "HR_to_RR_Ratio"
)

# Show the selected Silver DataFrame for verification
silver_df.show()

# Save the Silver DataFrame to the Silver container in the mounted path
silver_path = "/mnt/silver/silver_data.parquet"
silver_df.write.mode("overwrite").parquet(silver_path)

print(f"Silver data saved successfully to {silver_path}")


+-------------------+------------+-----+----------------------------------+--------------------+-----------------------------------+------------------+
|        Result Time|  Patient ID|Label|MDC_PULS_OXIM_PULS_RATE_Result_min|MDC_TEMP_Result_mean|MDC_PULS_OXIM_PULS_RATE_Result_mean|    HR_to_RR_Ratio|
+-------------------+------------+-----+----------------------------------+--------------------+-----------------------------------+------------------+
|2024-12-18 00:00:05|NiCdAaJcQbLf|    0|                             147.0|                98.4|                              148.0|2.9215686274509802|
|2024-12-18 00:00:06|NiCdAaJcQbLf|    0|                             149.0|                98.4|                              149.0|2.9215686274509802|
|2024-12-18 00:00:07|NiCdAaJcQbLf|    0|                             149.0|                98.4|                              149.0|2.9019607843137254|
|2024-12-18 00:00:08|NiCdAaJcQbLf|    0|                             149.0|             

In [0]:
!pip install mlflow

Collecting mlflow
  Downloading mlflow-2.19.0-py3-none-any.whl (27.4 MB)
[?25l[K     |                                | 10 kB 13.7 MB/s eta 0:00:03[K     |                                | 20 kB 8.1 MB/s eta 0:00:04[K     |                                | 30 kB 6.0 MB/s eta 0:00:05[K     |                                | 40 kB 4.0 MB/s eta 0:00:07[K     |                                | 51 kB 4.8 MB/s eta 0:00:06[K     |                                | 61 kB 4.9 MB/s eta 0:00:06[K     |                                | 71 kB 4.8 MB/s eta 0:00:06[K     |                                | 81 kB 4.9 MB/s eta 0:00:06[K     |                                | 92 kB 5.5 MB/s eta 0:00:05[K     |▏                               | 102 kB 4.6 MB/s eta 0:00:06[K     |▏                               | 112 kB 4.6 MB/s eta 0:00:06[K     |▏                               | 122 kB 4.6 MB/s eta 0:00:06[K     |▏                               | 133 kB 4.6 MB/s eta 0:00:06[K

In [0]:
mlflow.set_tracking_uri("databricks")


In [0]:
import mlflow

experiment_name = "/Users/tanmay.c.kadam@gmail.com/silver_layer_scaler_experiment1"
experiment = mlflow.get_experiment_by_name(experiment_name)

if experiment:
    print(f"Experiment '{experiment_name}' already exists with ID {experiment.experiment_id}")
else:
    print(f"Experiment '{experiment_name}' does not exist.")


Experiment '/Users/tanmay.c.kadam@gmail.com/silver_layer_scaler_experiment1' already exists with ID 802063317058201


In [0]:
if not experiment:
    experiment_id = mlflow.create_experiment(experiment_name)
    print(f"Created experiment '{experiment_name}' with ID {experiment_id}")
else:
    experiment_id = experiment.experiment_id

mlflow.set_experiment(experiment_name)


Out[62]: <Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/802063317058201', creation_time=1734534390430, experiment_id='802063317058201', last_update_time=1734534390430, lifecycle_stage='active', name='/Users/tanmay.c.kadam@gmail.com/silver_layer_scaler_experiment1', tags={'mlflow.experiment.sourceName': '/Users/tanmay.c.kadam@gmail.com/silver_layer_scaler_experiment1',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'tanmay.c.kadam@gmail.com',
 'mlflow.ownerId': '1567855716883546'}>

In [0]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import mlflow
import mlflow.sklearn

# Initialize SparkSession
spark = SparkSession.builder.appName("ModelPreparation").getOrCreate()

# Load the Silver DataFrame from the Silver layer
silver_path = "/mnt/silver/silver_data.parquet"
silver_df = spark.read.parquet(silver_path)

# Convert Spark DataFrame to Pandas DataFrame
silver_pd_df = silver_df.toPandas()

# Preprocessing
# Select features and label
features = [
    "MDC_PULS_OXIM_PULS_RATE_Result_min",
    "MDC_TEMP_Result_mean",
    "MDC_PULS_OXIM_PULS_RATE_Result_mean",
    "HR_to_RR_Ratio"
]
X = silver_pd_df[features]
y = silver_pd_df["Label"]

# Handle infinite and missing values
X.replace([np.inf, -np.inf], np.nan, inplace=True)
X.fillna(0, inplace=True)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Standard Scaling
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)



with mlflow.start_run(run_name="scaler_silver_layer"):
    # Log the scaler as an artifact
    mlflow.sklearn.log_model(scaler, artifact_path="standard_scaler")
    print("Scaler saved to MLflow successfully!")

# Print Scaler Information
print("Scaler mean:", scaler.mean_)
print("Scaler scale:", scaler.scale_)


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X.replace([np.inf, -np.inf], np.nan, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X.fillna(0, inplace=True)


Scaler saved to MLflow successfully!
🏃 View run scaler_silver_layer at: https://community.cloud.databricks.com/ml/experiments/802063317058201/runs/5697ad6983324a02a9c7afa803689e1f
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/802063317058201
Scaler mean: [157.4857369   97.84971612 157.88748712   3.90363263]
Scaler scale: [14.17710868  0.77943673 14.09069883  1.22924051]


In [0]:
!pip install xgboost
!pip install lightgbm

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-ab4e4b28-76a2-42b6-8901-e25ed22e29e5/bin/python -m pip install --upgrade pip' command.[0m
Collecting lightgbm
  Downloading lightgbm-4.5.0-py3-none-manylinux_2_28_x86_64.whl (3.6 MB)
[?25l[K     |                                | 10 kB 32.8 MB/s eta 0:00:01[K     |▏                               | 20 kB 8.7 MB/s eta 0:00:01[K     |▎                               | 30 kB 12.5 MB/s eta 0:00:01[K     |▍                               | 40 kB 5.8 MB/s eta 0:00:01[K     |▌                               | 51 kB 5.9 MB/s eta 0:00:01[K     |▌                               | 61 kB 7.0 MB/s eta 0:00:01[K     |▋                               | 71 kB 7.5 MB/s eta 0:00:01[K     |▊                               | 81 kB 5.9 MB/s eta 0:00:01[K     |▉                               | 92 kB 6.6 MB/s eta 0:00:01[K     |█                               | 102 kB 6.3 MB/s eta 0:00:01[K     |█           

In [0]:
from sklearn.model_selection import RandomizedSearchCV,train_test_split
from sklearn.metrics import f1_score
from sklearn.ensemble import RandomForestClassifier
def train_and_log_model(model,param, model_name):
    with mlflow.start_run(run_name=model_name):
        mlflow.log_params(param)
        gridSearch=RandomizedSearchCV(model,param,cv=3,scoring='f1',random_state=42)
        gridSearch.fit(X_train_scaled,y_train)
        best_model=gridSearch.best_estimator_
        y_pred=best_model.predict(X_test_scaled)
        f1 = f1_score(y_test, y_pred)
        print(f'{model_name} F1 score is : {f1}')
        mlflow.log_metric('f1',f1)
        mlflow.sklearn.log_model(best_model,model_name)

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.svm import SVC
models = {
    "RandomForest": (RandomForestClassifier(), {"n_estimators": [50, 100, 200], "max_depth": [3, 5, 10]}),
    "XGBoost": (XGBClassifier(use_label_encoder=False, eval_metric='logloss'), {"n_estimators": [50, 100], "learning_rate": [0.01, 0.1], "max_depth": [3, 5]}),

    "LightGBM": (LGBMClassifier(), {"n_estimators": [50, 100], "learning_rate": [0.01, 0.1], "max_depth": [-1, 5]})
}


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [0]:
# Run Experiments
for model_name, (model, params) in models.items():
    train_and_log_model(model, params, model_name)



RandomForest F1 score is : 0.8599875112745438




🏃 View run RandomForest at: https://community.cloud.databricks.com/ml/experiments/802063317058201/runs/0f4da99408484e33a367655481ae8396
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/802063317058201


Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encode

XGBoost F1 score is : 0.8484764157506609




🏃 View run XGBoost at: https://community.cloud.databricks.com/ml/experiments/802063317058201/runs/dd085604f0cb41e3956d7d2326d64c32
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/802063317058201




[LightGBM] [Info] Number of positive: 37498, number of negative: 34950
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.002084 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 651
[LightGBM] [Info] Number of data points in the train set: 72448, number of used features: 4
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.517585 -> initscore=0.070369
[LightGBM] [Info] Start training from score 0.070369
[LightGBM] [Info] Number of positive: 37497, number of negative: 34951
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.002441 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 651
[LightGBM] [Info] Number of data points in the train set: 72448, number of used features: 4
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.517571 -> initscore=0.070314
[LightGBM] [Info] Start training from score 0.070314
[LightGBM] [Info] Nu



🏃 View run LightGBM at: https://community.cloud.databricks.com/ml/experiments/802063317058201/runs/4095ff0d20854b83ad198866d0297732
🧪 View experiment at: https://community.cloud.databricks.com/ml/experiments/802063317058201
