## External Parameters

In [None]:
p = ArgumentParser()
p.add_argument("--env", required=False, type=str)
namespace = p.parse_known_args(sys.argv[1:])[0]

## Spark Utilites

In [None]:
from training import SparkRunner
spark_obj = SparkRunner()
spark = spark_obj.spark

## Retrieve Secrets

In [None]:
subscription_id = spark_obj.get_secret(
    "DBX_SP_Credentials",
    "SUBSCRIPTION_ID"
)

resource_group = spark_obj.get_secret(
    "AzureResourceSecrets",
    "RESOURCE_GROUP_NAME"
)

workspace_name = spark_obj.get_secret(
    "AzureResourceSecrets", 
    "AML_WS_NAME"
)

tenant_id = spark_obj.get_secret(
    "DBX_SP_Credentials",
    "DBX_SP_Tenant_ID"
)

service_principal_id = spark_obj.get_secret(
    "DBX_SP_Credentials",
    "DBX_SP_Client_ID"
)

service_principal_password = spark_obj.get_secret(
    "DBX_SP_Credentials",
    "DBX_SP_Client_Secret"
)


## AML Workspace Configs

In [None]:
from training import AzureMLConfiguration

azure_ml_obj = AzureMLConfiguration(
    spark=spark,
    tenant_id=tenant_id,
    service_principal_id=service_principal_id,
    service_principal_password=service_principal_password,
    subscription_id=subscription_id,
    resource_group=resource_group,
    workspace_name=workspace_name,
    workspace_region="uksouth"
    )
ws = azure_ml_obj.get_workspace_auth()

## Set MLFlow Experiment


In [None]:
from training import MachineLearningExperiment

experiment_name = "ciaran_experiment_nyc_taxi"
ml_ex_obj = MachineLearningExperiment(
    spark=spark,
    experiment_name=experiment_name,
    namespace=namespace,
    aml_workspace=ws
    )

## Start MLFlow Run

In [None]:
mlflow.end_run()
mlflow.autolog(exclusive=False)
mlflow.start_run()

## Feature Table Refresh

In [None]:
from feature_tbl_refresh import fs_refresh

In [None]:
spark = SparkSession.builder.getOrCreate()
taxi_data = spark.read.table("feature_store_taxi_example.nyc_yellow_taxi_with_zips")
fs_refresh(raw_data=taxi_data, spark=spark, start_date=datetime(2016, 1, 15), end_date=datetime(2016, 5, 29))

## Train Model 

#### Retrieve Data


In [None]:

fs_data_version = 0
data_path = "dbfs:/user/hive/warehouse/feature_store_taxi_example.db/nyc_yellow_taxi_with_zips"
taxi_data = ml_ex_obj.load_data(spark=spark, data_path=data_path, fs_data_version=fs_data_version)



#### Apply Feature Selection For Training Data

In [None]:
fs = feature_store.FeatureStoreClient()
pickup_feature_lookups = ml_ex_obj.feature_lookup(
    feature_table_name="feature_store_taxi_example.trip_pickup_features", 
    feature_lookups=["mean_fare_window_1h_pickup_zip", "count_trips_window_1h_pickup_zip"],
    lookup_key=["pickup_zip", "rounded_pickup_datetime"]  
)

dropoff_feature_lookups = ml_ex_obj.feature_lookup(
    feature_table_name="feature_store_taxi_example.trip_dropoff_features", 
    feature_lookups=["count_trips_window_30m_dropoff_zip", "dropoff_is_weekend"],
    lookup_key=["dropoff_zip", "rounded_dropoff_datetime"]  
    )

training_df, training_set = ml_ex_obj.get_taining_data(
    fs,
    taxi_data,
    pickup_feature_lookups + dropoff_feature_lookups,
    label="fare_amount",
        exclude_columns=[
        "rounded_pickup_datetime",
        "rounded_dropoff_datetime"
    ]
)

#### Train

In [None]:
model_name = "taxi_example_fare_packaged"
model = ml_ex_obj.train_model_lgbm(
    training_df, 
    training_set, # Feature Store Object Prior to df conversion (above)
    fs,
    model_params={
        "num_leaves": 32,
        "objective": "regression",
        "metric": "rmse"
    },
    model_name=model_name
)

#### Model Evaluation

In [None]:
taxi_data = spark.read.table("feature_store_taxi_example.nyc_yellow_taxi_with_zips")

## End MLFlow Run

In [None]:
mlflow.end_run()