In [9]:
import azureml.core

from azureml.core import Experiment, Workspace, Dataset, Datastore
from azureml.train.automl import AutoMLConfig
from notebookutils import mssparkutils
from azureml.data.dataset_factory import TabularDatasetFactory

StatementMeta(mmsparkpool, 57, 9, Finished, Available)

In [10]:
linkedService_name = "LS_AML"
experiment_name = "mm-synapse-base-NYC"

ws = mssparkutils.azureML.getWorkspace(linkedService_name)
experiment = Experiment(ws, experiment_name)

StatementMeta(mmsparkpool, 57, 10, Finished, Available)

In [5]:
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = r""

# Allow Spark to read from the blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),blob_sas_token)

# Spark read parquet; note that it won't load any data yet
df = spark.read.parquet(wasbs_path)

StatementMeta(mmsparkpool, 57, 5, Finished, Available)

In [6]:
# Create an ingestion filter
start_date = '2015-01-01 00:00:00'
end_date = '2015-12-31 00:00:00'

filtered_df = df.filter('tpepPickupDateTime > "' + start_date + '" and tpepPickupDateTime< "' + end_date + '"')

filtered_df.describe().show()

StatementMeta(mmsparkpool, 57, 6, Finished, Available)

+-------+------------------+------------------+------------------+------------+------------+------------------+-----------------+------------------+------------------+------------------+---------------+-------------------+------------------+------------------+--------------------+--------------------+------------------+------------------+------------------+---------+------------------+
|summary|          vendorID|    passengerCount|      tripDistance|puLocationId|doLocationId|          startLon|         startLat|            endLon|            endLat|        rateCodeId|storeAndFwdFlag|        paymentType|        fareAmount|             extra|              mtaTax|improvementSurcharge|         tipAmount|       tollsAmount|       totalAmount|   puYear|           puMonth|
+-------+------------------+------------------+------------------+------------+------------+------------------+-----------------+------------------+------------------+------------------+---------------+-------------------+

In [7]:
from datetime import datetime
from pyspark.sql.functions import *

# To make development easier, faster, and less expensive, downsample for now
sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)

taxi_df = sampled_taxi_df.select('vendorID', 'passengerCount', 'tripDistance',  'startLon', 'startLat', 'endLon' \
                                , 'endLat', 'paymentType', 'fareAmount', 'tipAmount'\
                                , column('puMonth').alias('month_num') \
                                , date_format('tpepPickupDateTime', 'hh').alias('hour_of_day')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('day_of_week')\
                                , dayofmonth(col('tpepPickupDateTime')).alias('day_of_month')
                                ,(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('trip_time'))\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 200)\
                                & (sampled_taxi_df.rateCodeId <= 5)\
                                & (sampled_taxi_df.paymentType.isin({"1", "2"})))
taxi_df.show(10)

StatementMeta(mmsparkpool, 57, 7, Finished, Available)

+--------+--------------+------------+------------------+------------------+------------------+------------------+-----------+----------+---------+---------+-----------+-----------+------------+---------+
|vendorID|passengerCount|tripDistance|          startLon|          startLat|            endLon|            endLat|paymentType|fareAmount|tipAmount|month_num|hour_of_day|day_of_week|day_of_month|trip_time|
+--------+--------------+------------+------------------+------------------+------------------+------------------+-----------+----------+---------+---------+-----------+-----------+------------+---------+
|       2|             2|        1.49|-73.99262237548828|  40.7373161315918|-73.98426818847656| 40.75123596191406|          2|      16.0|      0.0|        3|         05|    Tuesday|          10|     1671|
|       2|             1|        0.58|-73.94739532470703|  40.7716178894043| -73.9542007446289|40.767921447753906|          2|       4.5|      0.0|        3|         11|     Monday

In [8]:
datastore = Datastore.get_default(ws)
dataset = TabularDatasetFactory.register_spark_dataframe(taxi_df, datastore, name = experiment_name + "-dataset")


StatementMeta(mmsparkpool, 57, 8, Finished, Available)

Method register_spark_dataframe: This is an experimental method, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Validating arguments.
Arguments validated.
Writing spark dataframe to managed-dataset/b377d19f-8c78-4a6c-b732-566ab03fd550
Creating new dataset
Registering new dataset
Successfully created and registered a new dataset.