## Apache-spark-azure-machine-learning-tutorial.

## Apache-spark-azure-machine-learning-tutorial.

En este tutorial, utilizará el aprendizaje automático automatizado en Azure Machine Learning para crear un modelo de regresión para predecir los precios de las tarifas de taxi. Este proceso llega al mejor modelo aceptando datos de entrenamiento y ajustes de configuración, e iterando automáticamente a través de combinaciones de diferentes métodos, modelos y ajustes de hiperparámetros. Este esta badado en https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/synapse-analytics/spark/apache-spark-azure-machine-learning-tutorial.md

En este tutorial se muestra como:
- Crear un grupo de Apache Spark sin servidor usando Synapse Studio
- Descargar los datos mediante Apache Spark y Azure Open Datasets.
- Transformar y limpiar datos mediante Apache Spark DataFrames.
- Entrenar un modelo de regresión lineal.
- Calcular la precisión del modelo.



Debido a que los datos sin procesar están en formato Parquet, puede usar el contexto Spark para extraer el archivo directamente a la memoria como un DataFrame. Cree un Spark DataFrame recuperando los datos a través de la API Open Datasets. Aquí, utiliza el esquema Spark DataFrame en las propiedades de lectura para inferir los tipos de datos y el esquema.

In [30]:
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = r""

# Allow Spark to read from the blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),blob_sas_token)

# Spark read parquet; note that it won't load any data yet
df = spark.read.parquet(wasbs_path)


StatementMeta(recopatrspark, 16, 2, Finished, Available)

Dependiendo del tamaño de su grupo de Spark, los datos sin procesar pueden ser demasiado grandes o tardar demasiado en operar con ellos. Puede filtrar estos datos a algo más pequeño, como un mes de datos, utilizando los filtros start_date y end_date. Después de filtrar un DataFrame, también ejecuta la función describe() en el nuevo DataFrame para ver estadísticas resumidas para cada campo.

Según las estadísticas resumidas, puede ver que hay algunas irregularidades en los datos. Por ejemplo, las estadísticas muestran que la distancia mínima de viaje es inferior a 0. Es necesario filtrar estos puntos de datos irregulares.

In [31]:
# Create an ingestion filter
start_date = '2015-01-01 00:00:00'
end_date = '2015-12-31 00:00:00'

filtered_df = df.filter('tpepPickupDateTime > "' + start_date + '" and tpepPickupDateTime< "' + end_date + '"')

filtered_df.describe().show()

StatementMeta(recopatrspark, 16, 3, Finished, Available)

+-------+------------------+------------------+------------------+------------+------------+------------------+-----------------+------------------+------------------+------------------+---------------+------------------+------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+---------+------------------+
|summary|          vendorID|    passengerCount|      tripDistance|puLocationId|doLocationId|          startLon|         startLat|            endLon|            endLat|        rateCodeId|storeAndFwdFlag|       paymentType|        fareAmount|             extra|             mtaTax|improvementSurcharge|         tipAmount|        tollsAmount|       totalAmount|   puYear|           puMonth|
+-------+------------------+------------------+------------------+------------+------------+------------------+-----------------+------------------+------------------+------------------+---------------+------------------+---

Genere funciones a partir del conjunto de datos seleccionando un conjunto de columnas y creando varias funciones basadas en el tiempo desde el campo de fecha y hora de recogida. Filtre los valores atípicos que se identificaron en el paso anterior y luego elimine las últimas columnas porque no son necesarias para el entrenamiento.

In [32]:
from datetime import datetime
from pyspark.sql.functions import *

# To make development easier, faster, and less expensive, downsample for now
sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)

taxi_df = sampled_taxi_df.select('vendorID', 'passengerCount', 'tripDistance',  'startLon', 'startLat', 'endLon' \
                                , 'endLat', 'paymentType', 'fareAmount', 'tipAmount'\
                                , column('puMonth').alias('month_num') \
                                , date_format('tpepPickupDateTime', 'hh').alias('hour_of_day')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('day_of_week')\
                                , dayofmonth(col('tpepPickupDateTime')).alias('day_of_month')
                                ,(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('trip_time'))\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 200)\
                                & (sampled_taxi_df.rateCodeId <= 5)\
                                & (sampled_taxi_df.paymentType.isin({"1", "2"})))
taxi_df.show(10)

StatementMeta(recopatrspark, 16, 4, Finished, Available)

+--------+--------------+------------+------------------+------------------+------------------+------------------+-----------+----------+---------+---------+-----------+-----------+------------+---------+
|vendorID|passengerCount|tripDistance|          startLon|          startLat|            endLon|            endLat|paymentType|fareAmount|tipAmount|month_num|hour_of_day|day_of_week|day_of_month|trip_time|
+--------+--------------+------------+------------------+------------------+------------------+------------------+-----------+----------+---------+---------+-----------+-----------+------------+---------+
|       2|             2|        1.49|-73.99262237548828|  40.7373161315918|-73.98426818847656| 40.75123596191406|          2|      16.0|      0.0|        3|         05|    Tuesday|          10|     1671|
|       2|             1|        0.58|-73.94739532470703|  40.7716178894043| -73.9542007446289|40.767921447753906|          2|       4.5|      0.0|        3|         11|     Monday

## Generar conjuntos de datos de prueba y validación
Una vez que tenga su conjunto de datos final, puede dividir los datos en conjuntos de entrenamiento y prueba utilizando la función random_split en Spark. Al utilizar las ponderaciones proporcionadas, esta función divide aleatoriamente los datos en el conjunto de datos de entrenamiento para el entrenamiento del modelo y el conjunto de datos de validación para las pruebas.

In [33]:
# Random split dataset using Spark; convert Spark to pandas
training_data, validation_data = taxi_df.randomSplit([0.8,0.2], 223)

StatementMeta(recopatrspark, 16, 5, Finished, Available)

## Conéctese a un área de trabajo de Azure Machine Learning
En Azure Machine Learning, un área de trabajo es una clase que acepta su suscripción de Azure e información de recursos. También crea un recurso en la nube para monitorear y rastrear las ejecuciones de su modelo. En este paso, creará un objeto de área de trabajo a partir del área de trabajo de Azure Machine Learning existente.

In [34]:
import azureml.core

from azureml.core import Experiment, Workspace, Dataset, Datastore
from azureml.data.dataset_factory import TabularDatasetFactory

StatementMeta(recopatrspark, 16, 6, Finished, Available)

In [35]:
#Enter details of your Azure Machine Learning workspace
subscription_id = '6317b93a-f64a-41c2-b939-d78a49d7707b'
resource_group = 'salazarpamela3-rg'
workspace_name = 'reconocimiento_patrones_inv3'
experiment_name = "aml-synapse-regression"


ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name)
experiment = Experiment(ws, experiment_name)

StatementMeta(recopatrspark, 16, 7, Finished, Available)

In [36]:
datastore = Datastore.get_default(ws)
dataset = TabularDatasetFactory.register_spark_dataframe(training_data, datastore, name = experiment_name + "-dataset")

StatementMeta(recopatrspark, 16, 8, Finished, Available)

Method register_spark_dataframe: This is an experimental method, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Validating arguments.
Arguments validated.
Writing spark dataframe to managed-dataset/615c3d14-dd85-4d59-862b-d996a1360627
Creating new dataset
Registering new dataset
Successfully created and registered a new dataset.


## Entrenar modelo

In [37]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

StatementMeta(recopatrspark, 16, 9, Finished, Available)

In [40]:
assembler = VectorAssembler(
    inputCols=["passengerCount","tripDistance","startLon","startLat","endLon",
                "endLat","fareAmount","tipAmount","month_num","day_of_month","trip_time"],
    outputCol="features")

data = assembler.transform(taxi_df)


train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

StatementMeta(recopatrspark, 16, 12, Finished, Available)

In [41]:
from azureml.core import Run

StatementMeta(recopatrspark, 16, 13, Finished, Available)

In [53]:
with experiment.start_logging() as run:
    model_name = "LinearRegression"
    run.log("Model Name",model_name)
    lr = LinearRegression(featuresCol="features", labelCol="fareAmount", predictionCol="predicted_fareAmount")
    lr_model = lr.fit(train_data)
    predictions = lr_model.transform(test_data)

    evaluator = RegressionEvaluator(labelCol="fareAmount", predictionCol="predicted_fareAmount", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

    evaluator_r2 = RegressionEvaluator(labelCol="fareAmount", predictionCol="predicted_fareAmount", metricName="r2")
    r2 = evaluator_r2.evaluate(predictions)
    print("R-squared (R2) on test data: {:.3f}".format(r2))

    # Log Performance
    run.log('RMSE', rmse)
    run.log("R2", r2)

StatementMeta(recopatrspark, 16, 25, Finished, Available)

Root Mean Squared Error (RMSE) on test data: 0.000
R-squared (R2) on test data: 1.000


In [42]:
lr = LinearRegression(featuresCol="features", labelCol="fareAmount", predictionCol="predicted_fareAmount")
lr_model = lr.fit(train_data)

StatementMeta(recopatrspark, 16, 14, Finished, Available)

In [43]:
predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="fareAmount", predictionCol="predicted_fareAmount", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="fareAmount", predictionCol="predicted_fareAmount", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

StatementMeta(recopatrspark, 16, 15, Finished, Available)

Root Mean Squared Error (RMSE) on test data: 0.000
R-squared (R2) on test data: 1.000


## 