#### Referências
- https://learn.microsoft.com/en-us/fabric/data-science/fabric-sparkml-tutorial

#### Obtenção dos dados

In [None]:
from azureml.opendatasets import NycTlcYellow
import pandas as pd
from dateutil import parser

end_date = parser.parse('2018-05-05 00:00:00')
start_date = parser.parse('2018-05-01 00:00:00')
nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()

# Sample 1: Using a specific seed
nyc_tlc_pandas_sample_1 = nyc_tlc_pd.sample(
    frac=0.001,
    replace=True,
    random_state=1234
)

nyc_tlc_pd.to_csv("nyc_tlc_train_test.csv")

# Sample 2: Using a different seed
nyc_tlc_pandas_sample_2 = nyc_tlc_pd.sample(
    frac=0.001,
    replace=True,
    random_state=5678
)

nyc_tlc_pd.to_csv("nyc_tlc_predict.csv")

#### Import the libraries

In [None]:
from datetime import datetime
from pyspark.sql.functions import unix_timestamp, date_format, col, when
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession
import pandas as pd

#### Criação da conexão Spark

In [None]:
# Create a SparkSession with specific configurations
spark = SparkSession.builder \
    .appName("Spark Application") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "4") \
    .config("spark.python.worker.timeout", "600") \
    .getOrCreate()

#### Construct input pyspark.dataframe

In [None]:
sample_url = "/home/jovyan/work/nyc_tlc_train_test.csv"

In [None]:
nyc_tlc_pandas_sampled = pd.read_csv(sample_url)

nyc_tlc_pandas_sampled = (
    spark.createDataFrame(nyc_tlc_pandas_sampled)
)

#### Prepare the data

In [None]:
taxi_df = (
    nyc_tlc_pandas_sampled
    .select(
        'totalAmount',
        'fareAmount',
        'tipAmount',
        'paymentType',
        'rateCodeId',
        'passengerCount',
        'tripDistance',
        'tpepPickupDateTime',
        'tpepDropoffDateTime',
        date_format('tpepPickupDateTime', 'hh').alias('pickupHour'),
        date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
        (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
        (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
    )
    .filter(
        (col('passengerCount') > 0) &
        (col('passengerCount') < 8) &
        (col('tipAmount') >= 0) &
        (col('tipAmount') <= 25) &
        (col('fareAmount') >= 1) &
        (col('fareAmount') <= 250) &
        (col('tipAmount') < col('fareAmount')) &
        (col('tripDistance') > 0) &
        (col('tripDistance') <= 100) &
        (col('rateCodeId') <= 5) &
        (col('paymentType').isin("1", "2"))
    )
)

In [None]:
taxi_featurised_df = (
    taxi_df
    .select(
        'totalAmount',
        'fareAmount',
        'tipAmount',
        'paymentType',
        'passengerCount',
        'tripDistance',
        'weekdayString',
        'pickupHour',
        'tripTimeSecs',
        'tipped',
        when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
        .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
        .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
        .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
        .otherwise("Unknown").alias('trafficTimeBins')  # Changed 0 to "Unknown" for consistency
    )
    .filter(
        (col('tripTimeSecs') >= 30) &
        (col('tripTimeSecs') <= 7200)
    )
)

#### Create a LR Model

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Create StringIndexer and OneHotEncoder instances for categorical features
sI1 = StringIndexer(
    inputCol="trafficTimeBins",
    outputCol="trafficTimeBinsIndex"
)

en1 = OneHotEncoder(
    dropLast=False,
    inputCol="trafficTimeBinsIndex",
    outputCol="trafficTimeBinsVec"
)

sI2 = StringIndexer(
    inputCol="weekdayString",
    outputCol="weekdayIndex"
)

en2 = OneHotEncoder(
    dropLast=False,
    inputCol="weekdayIndex",
    outputCol="weekdayVec"
)

# Create and fit a Pipeline to apply the transformations
pipeline = Pipeline(stages=[sI1, en1, sI2, en2])

# Transform the DataFrame with the Pipeline
encoded_final_df = pipeline.fit(taxi_featurised_df).transform(taxi_featurised_df)


#### Starts the MLFLOW

In [None]:
import mlflow

mlflow.set_experiment(f'nyc_traffic_taxi')

In [None]:
from datetime import datetime

now = datetime.now()

current_time = now.strftime("%d/%m/%Y - %H:%M:%S")
current_time

#### Train and test the LR Model

In [None]:
# Define the fractions for splitting the data
training_fraction = 0.7
testing_fraction = 1 - training_fraction
seed = 1234

# Split the DataFrame into training and testing DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit(
    weights=[training_fraction, testing_fraction],
    seed=seed
)

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from datetime import datetime

# Define the formula for the model
classFormula = RFormula(
    formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec"
)

for i in range(1, 11):

    print("LR TRAIN AND TEST: ", i)
    
    reg_param = 0.1 * i

    #-----MLFLOW-----
    run_name = f'lr_model_train_{i}'

    runs = {
        "run_name": run_name
    }

    # Start run
    mlflow.start_run(run_name=run_name)

    run = mlflow.active_run()
    
    run_id = run.info.run_id

    parametros = {
        "reg_param": reg_param,
    }
    #----------------
    
    # Create a new Logistic Regression object for the model
    logReg = LogisticRegression(
        maxIter=10,
        regParam=reg_param,
        labelCol='tipped'
    )
    
    # Create a Pipeline with the formula and logistic regression stages
    pipeline = Pipeline(stages=[classFormula, logReg])
    
    # Train the logistic regression model
    lrModel = pipeline.fit(train_data_df)
    
    # Predict tip 1/0 (yes/no) on the test dataset and evaluate using area under ROC
    predictions = lrModel.transform(test_data_df)
    
    # Convert predictions to RDD and compute metrics
    prediction_and_labels = predictions.select("label", "prediction").rdd
    metrics = BinaryClassificationMetrics(prediction_and_labels)

    #-----MLFLOW-----
    
    metricas = {
        "area_under_roc": metrics.areaUnderROC
    }

    print("area_under_roc: ", metrics.areaUnderROC)
    
    mlflow.set_tag("data", current_time)
    mlflow.log_params(parametros)
    mlflow.log_metrics(metricas)
    mlflow.log_artifact(sample_url)

    mlflow.spark.log_model(spark_model=lrModel, artifact_path="lr_model")
    
    mlflow.end_run()

    print("-"*50)

#### Close the Spark Connection

In [None]:
spark.stop()