DataTransferStep : 

In certain cases, you will need to transfer data from one data location to another. For example, your data may be in Azure SQL Database and you may want to move it to Azure Data Lake storage. Or, your data is in an ADLS account and you want to make it available in the Blob storage. The built-in DataTransferStep class helps you transfer data in these situations.

Step 1: Import libraries

In [None]:
import os
import azureml.core
from azureml.core.compute import ComputeTarget, DataFactoryCompute
from azureml.exceptions import ComputeTargetException
from azureml.core import Workspace, Experiment
from azureml.pipeline.core import Pipeline
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import DataTransferStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

# here importing DataFactory compute as well.

step 2 : Initialize workspace

In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

Register Datastores and create DataReferences.

In [None]:
# Here using azure blob storage as default 
from azureml.exceptions import UserErrorException

blob_datastore_name='MyBlobDatastore'
account_name=os.getenv("BLOB_ACCOUNTNAME_62", "<my-account-name>") # Storage account name
container_name=os.getenv("BLOB_CONTAINER_62", "<my-container-name>") # Name of Azure blob container
account_key=os.getenv("BLOB_ACCOUNT_KEY_62", "<my-account-key>") # Storage account key

try:
    blob_datastore = Datastore.get(ws, blob_datastore_name)
    print("Found Blob Datastore with name: %s" % blob_datastore_name)
except UserErrorException:
    blob_datastore = Datastore.register_azure_blob_container(
        workspace=ws,
        datastore_name=blob_datastore_name,
        account_name=account_name, # Storage account name
        container_name=container_name, # Name of Azure blob container
        account_key=account_key) # Storage account key
    print("Registered blob datastore with name: %s" % blob_datastore_name)

blob_data_ref = DataReference(
    datastore=blob_datastore,
    data_reference_name="blob_test_data",
    path_on_datastore="testdata")

Step 3 : If we want to move data from/to blob store to/from Azure SQL database then use below:

In [None]:
sql_datastore_name="MySqlDatastore"
server_name=os.getenv("SQL_SERVERNAME_62", "<my-server-name>") # Name of SQL server
database_name=os.getenv("SQL_DATBASENAME_62", "<my-database-name>") # Name of SQL database
client_id=os.getenv("SQL_CLIENTNAME_62", "<my-client-id>") # client id of service principal with permissions to access database
client_secret=os.getenv("SQL_CLIENTSECRET_62", "<my-client-secret>") # the secret of service principal
tenant_id=os.getenv("SQL_TENANTID_62", "<my-tenant-id>") # tenant id of service principal

try:
    sql_datastore = Datastore.get(ws, sql_datastore_name)
    print("Found sql database datastore with name: %s" % sql_datastore_name)
except UserErrorException:
    sql_datastore = Datastore.register_azure_sql_database(
        workspace=ws,
        datastore_name=sql_datastore_name,
        server_name=server_name,
        database_name=database_name,
        client_id=client_id,
        client_secret=client_secret,
        tenant_id=tenant_id)
    print("Registered sql databse datastore with name: %s" % sql_datastore_name)

from azureml.data.sql_data_reference import SqlDataReference

sql_query_data_ref = SqlDataReference(
    datastore=sql_datastore,
    data_reference_name="sql_query_data_ref",
    sql_query="select top 1 * from TestData")

DataTransferStep is used to transfer data between Azure Blob, Azure Data Lake Store, and Azure SQL database.

name: Name of module
source_data_reference: Input connection that serves as source of data transfer operation.
destination_data_reference: Input connection that serves as destination of data transfer operation.
compute_target: Azure Data Factory to use for transferring data.
allow_reuse: Whether the step should reuse results of previous DataTransferStep when run with same inputs. Set as False to force data to be transferred again.
Optional arguments to explicitly specify whether a path corresponds to a file or a directory. These are useful when storage contains both file and directory with the same name or when creating a new destination path.

source_reference_type: An optional string specifying the type of source_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path.
destination_reference_type: An optional string specifying the type of destination_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path.

### Setup Data Factory Account


In [None]:
data_factory_name = 'adftest'

def get_or_create_data_factory(workspace, factory_name):
    try:
        return DataFactoryCompute(workspace, factory_name)
    except ComputeTargetException as e:
        if 'ComputeTargetNotFound' in e.message:
            print('Data factory not found, creating...')
            provisioning_config = DataFactoryCompute.provisioning_configuration()
            data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
            data_factory.wait_for_completion()
            return data_factory
        else:
            raise e
            
data_factory_compute = get_or_create_data_factory(ws, data_factory_name)

print("Setup Azure Data Factory account complete")

### Create a DataTransferStep

 Step 4 : Transfer data from SQL to blob store

In [None]:
transfer_sql_to_blob = DataTransferStep(
    name="transfer_sql_to_blob",
    source_data_reference=sql_query_data_ref,
    destination_data_reference=blob_data_ref,
    compute_target=data_factory_compute,
    destination_reference_type='file')

Build and Submit the Experiment

In [None]:
pipeline_01 = Pipeline(
    description="data_transfer_01",
    workspace=ws,
    steps=[transfer_sql_to_blob])

pipeline_run_01 = Experiment(ws, "Data_Transfer_example_01").submit(pipeline_01)
pipeline_run_01.wait_for_completion()

View Run Details

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run_01).show()