# Document Referece
Submit Spark jobs in Azure Machine Learning (preview) [[link](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-submit-spark-jobs?tabs=cli)]

Interactive Data Wrangling with Apache Spark in Azure Machine Learning (preview) [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml)]

- Import and wrangle data from Azure Machine Learning Datastore [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml#import-and-wrangle-data-from-azure-machine-learning-datastore)]
- Import and wrangle data from Azure Blob storage [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml#import-and-wrangle-data-from-azure-blob-storage)]
- Import and wrangle data from Azure Data Lake Storage (ADLS) Gen 2 [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml#import-and-wrangle-data-from-azure-data-lake-storage-adls-gen-2)]
- Accessing data on the default file share [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml#accessing-data-on-the-default-file-share)]
 

Identity used to access data
 - Credential [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml#import-and-wrangle-data-from-azure-blob-storage)]
 - Credential-less [[link](https://learn.microsoft.com/en-us/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml#import-and-wrangle-data-from-azure-blob-storage)]


# Configure Session (Optional)

Use "Configure Session" panel at the bottom

# Read data from ADLS gen 2 using a URI

Read in your raw data. Supported URI's include:

- AzureML Datastore URIs e.g azureml://subscriptions/X/resourcegroups/X/workspaces/X/datastores/X/paths/X
- Azure Storage URIs e.g. abfss://filesystem@account.dfs.core.windows.net/data/path

## Read data to spark dataframe

In [1]:
df = spark.read.parquet("abfss://data@feli1storegen2.dfs.core.windows.net/input")
display(df)

# import pyspark.pandas as pd
# df = pd.read_parquet("abfss://data@adlseastus86462.dfs.core.windows.net/green")

StatementMeta(aa875038-37b4-4ab6-a339-e8cc2dfb7f38, 7, 4, Submitted, Running)

## Clean the data

In [26]:
# remove outliers
clean_data = df.filter((df.trip_distance>0) & (df.trip_distance<500) & (df.passenger_count>0))


clean_data = clean_data.drop('puLocationId',
 'doLocationId',
 'pickupLongitude',
 'pickupLatitude',
 'dropoffLongitude',
 'dropoffLatitude',
 'rateCodeID',
 'storeAndFwdFlag',
 'extra',
 'mtaTax',
 'improvementSurcharge',
 'ehailFee',
 'tripType',
 'puYear',
 'puMonth')

StatementMeta(aa875038-37b4-4ab6-a339-e8cc2dfb7f38, 6, 42, Finished, Available)

## Feature Engineering

In [27]:
from pyspark.sql import functions as F

training_data = clean_data.withColumn('puHour', F.hour('lpep_pickup_datetime'))
training_data = training_data.withColumn('duration', (F.unix_timestamp('lpep_dropoff_datetime')-F.unix_timestamp('lpep_pickup_datetime'))/60)
training_data = training_data.withColumn('distanceOverDuration', F.col('trip_distance')/F.col('duration'))
display(training_data)

StatementMeta(aa875038-37b4-4ab6-a339-e8cc2dfb7f38, 6, 43, Finished, Available)

SynapseWidget(Synapse.DataFrame, 33a8d9d4-f6cd-4ba0-99ad-3f780b8f9507)

## Write & version training data to ADLS gen 2

In [28]:
training_data.write.format("delta").mode('overwrite').save("abfss://data@feli1storegen2.dfs.core.windows.net/output")

StatementMeta(aa875038-37b4-4ab6-a339-e8cc2dfb7f38, 6, 44, Submitted, Running)

# Read data from default file share
The default file share is mounted to spark cluster.

In [23]:
import os
import pyspark.pandas as pd

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/feli1/test_data_10k.csv"

df = pd.read_csv(file)
df


StatementMeta(aa875038-37b4-4ab6-a339-e8cc2dfb7f38, 6, 39, Finished, Available)

Unnamed: 0,requestMinute,app,subscriptionId,resourceGroup,workspaceName,datasetid,activityName,activityGroup,clientType,requests
0,2019-07-16 11:02:00,dataset,e5388881-739a-49db-982b-6a4b412179ed,test-rg-centralus-2019W29,ds-centralus,4a56d61d-d2dc-42b5-a6b9-008bf86dac2b,GET Datasets/GetDatasetById,Retrieve Dataset,SDK,2
1,2019-07-16 11:02:00,dataset,a02ce7fd-1f09-4cd2-855d-49b67aef9c20,test-rg-westus-2019W29,ds-westus,29a3463c-429c-4119-850d-be3ac0fa8472,GET Datasets/GetDatasetById,Retrieve Dataset,SDK,2
2,2019-07-16 11:02:00,dataset,e5388881-739a-49db-982b-6a4b412179ed,test-rg-centralus-2019W29,ds-centralus,30b3001c-e9c5-4f4a-9127-58e97111d70c,GET Datasets/GetDatasetByName,Retrieve Dataset,SDK,4
3,2019-07-16 11:02:00,dataset,415578b2-69eb-467f-9275-44d2083bbc1a,test-rg-northcentralus-2019W29,ds-northcentralus,c05849bc-6ff4-41a6-bdd8-ca85d4f31682,POST Datasets/Register,Register Dataset,SDK,1
4,2019-07-16 11:01:00,dataset,415578b2-69eb-467f-9275-44d2083bbc1a,test-rg-northcentralus-2019W29,ds-northcentralus,58803835-8523-44aa-96c6-5eaca2241721,GET Datasets/GetDatasetById,Retrieve Dataset,SDK,2
5,2019-07-16 11:02:00,dataset,e5388881-739a-49db-982b-6a4b412179ed,test-rg-centralus-2019W29,ds-centralus,70605b43-8d73-490d-908e-cdeb410602a1,POST Datasets/Register,Register Dataset,SDK,1
6,2019-07-16 11:02:00,dataset,415578b2-69eb-467f-9275-44d2083bbc1a,test-rg-northcentralus-2019W29,ds-northcentralus,d3c9ede5-2c93-44d3-9d1b-5508d2eb6c51,GET Datasets/GetDatasetByName,Retrieve Dataset,SDK,4
7,2019-07-16 11:02:00,dataset,a02ce7fd-1f09-4cd2-855d-49b67aef9c20,test-rg-westus-2019W29,ds-westus,595668a3-3aa5-427e-b8f9-8aff794b7a66,POST Datasets/Register,Register Dataset,SDK,1
8,2019-07-16 11:03:00,dataset,e5388881-739a-49db-982b-6a4b412179ed,test-rg-centralus-2019W29,ds-centralus,f82a8162-5089-4245-b1d7-eadc2249858d,POST Datasets/Register,Register Dataset,SDK,1
9,2019-07-16 11:03:00,dataset,a02ce7fd-1f09-4cd2-855d-49b67aef9c20,test-rg-westus-2019W29,ds-westus,b0413314-3d69-4aa5-a17d-c00755b201bd,POST Datasets/Register,Register Dataset,SDK,1


# Notes:

fileshare only mounted in session scenario, but not the batch job

For hobo spark batch job, we only supports SDKV2

