Load sparkmagic before start


In [1]:
%load_ext sparkmagic.magics

## Azure ML Setup

In [2]:
from azureml.core import Workspace, Dataset, Datastore

In [3]:
ws = Workspace.from_config()

## Register Synapse ADLS Gen2 Account as a Datastore

Let's register the Synapse workspace's default ADLS Gen2 account as a datastore and make it the default for Azure ML. This will make life easier for working between Synapse and Azure ML with this specific workspace. 

In [4]:
ds = Datastore.register_azure_blob_container(ws, 'data4synapse', 'default', 'data4synapse', account_key = 'jZg362stjKP6EjMdXKCDH7OgnqOCQ2qPDTry5CbRwv6PB0DlJWBssdgvfQbi5GUy3dO5pgv7jRv4qtw0ngccwg==')

In [5]:
ds.set_as_default()

In [6]:
dsetdata = 'noaa-isd-files'
data_url = 'https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather'

if dsetdata not in ws.datasets:
    os.system('sudo chmod 777 /mnt')
    for year in range(2008, 2020+1):
        ds = Dataset.File.from_files(f'{data_url}/year={year}/month=*/*.parquet', validate=False)
        print('Downloading...')
        %time ds.download(f'/mnt/data/isd/year={year}', overwrite=True)
    print('Uploading...')
    %time ws.get_default_datastore().upload('/mnt/data/isd', '/noaa-isd', show_progress=False)
    ds = Dataset.File.from_files((ws.get_default_datastore(), '/noaa-isd/**/*.parquet'))
    ds = ds.register(ws, dsetdata)

In [7]:
dsetdata = 'noaa-isd-tabular'

if dsetdata not in ws.datasets:
    ds = Dataset.Tabular.from_parquet_files((ws.get_default_datastore(), '/noaa-isd/**/*.parquet'))
    ds = ds.register(ws, dsetdata)

## Start Spark Session

In [8]:
# use any Synapse workspace/Spark pool you can access with AAD 
%spark start --workspace sparky --sparkpool sparky

To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code BBJTKLSYR to authenticate.
Starting Spark application ...
SparkSession available as 'spark'.


## Configure Spark Session

In [None]:
#%%spark config # TODO: learn how to configure Spark sessions 

## Work with data

In [9]:
%%spark

# get workspace
from azureml.core import Workspace
subscription_id = '6560575d-fa06-4e7d-95fb-f962e74efd7a'
resource_group = 'cody-westus2-rg'
workspace_name = 'AzureML'

ws = Workspace(subscription_id, resource_group, workspace_name)
ws

Workspace.create(name='AzureML', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='cody-westus2-rg')

In [10]:
%%spark 

ds = ws.datasets['noaa-isd-tabular']
ds

{
  "source": [
    "('data4synapse', 'noaa-isd/**/*.parquet')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ReadParquetFile",
    "DropColumns"
  ],
  "registration": {
    "id": "3254db94-0c1d-416e-be99-98061ae4f479",
    "name": "noaa-isd-tabular",
    "version": 1,
    "workspace": "Workspace.create(name='AzureML', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='cody-westus2-rg')"
  }
}

In [11]:
%%spark

df = ds.to_spark_dataframe()

An error was encountered:
An error occurred while calling o135.getFiles.
: java.lang.RuntimeException: Unable to get service host.
Workspace ID: 
Workspace ID Program Error: Could not find valid SPARK_HOME while searching ['/var/yarn-nm/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1584547190049_0003', '/usr/hdp/2.6.99.201-0/spark2/python/lib/pyspark.zip/pyspark', '/opt/spark/python/lib/pyspark.zip/pyspark', '/opt/spark/python/lib']
Service Url Program Error: Could not find valid SPARK_HOME while searching ['/var/yarn-nm/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1584547190049_0003', '/usr/hdp/2.6.99.201-0/spark2/python/lib/pyspark.zip/pyspark', '/opt/spark/python/lib/pyspark.zip/pyspark', '/opt/spark/python/lib']
	at com.microsoft.dprep.integration.azureml.ServiceDiscovery$.getHistoryUri(ServiceDiscovery.scala:37)
	at com.microsoft.dprep.integration.azureml.DatastoreServiceResolver.resolve(DatastoreResolver.scala:47)
	at com.microsoft.d

In [14]:
%%spark

dstore = ws.get_default_datastore()

account_name = dstore.account_name
container_name = dstore.container_name
relative_path = 'noaa-isd/'

adls_path = f'abfss://{container_name}@{account_name}.dfs.core.windows.net/{relative_path}'
adls_path

'abfss://default@data4synapse.dfs.core.windows.net/noaa-isd/'

In [15]:
%%spark

df = sqlContext.read.parquet(f'{adls_path}/year=*/month=*/*.parquet')
df.show()

+------+-----+-------------------+--------+---------+---------+---------+---------+-----------+--------------+-------------+-----------------------+--------------------+----------+-----------+---------+------------------+---------------+------------+----+---+-------+
|  usaf| wban|           datetime|latitude|longitude|elevation|windAngle|windSpeed|temperature|seaLvlPressure|cloudCoverage|presentWeatherIndicator|pastWeatherIndicator|precipTime|precipDepth|snowDepth|       stationName|countryOrRegion|         p_k|year|day|version|
+------+-----+-------------------+--------+---------+---------+---------+---------+-----------+--------------+-------------+-----------------------+--------------------+----------+-----------+---------+------------------+---------------+------------+----+---+-------+
|999999|23909|2020-01-05 21:50:00|  37.634|  -91.723|    365.0|     null|     null|       12.0|          null|         null|                   null|                null|      null|       null|    

In [None]:
%%spark

df.describe().show()

In [None]:
%%spark

sqlContext

In [None]:
%%spark

sc

### Show Session Details

In [None]:
%spark info

### Stop Session

In [None]:
%spark stop

### Override Session Config

In [None]:
%%spark config
{
    "driverMemory":"8g",
    "driverCores":2,
    "executorMemory":"8g",
    "executorCores":2,
    "numExecutors":2
}

### Example: RDD operation

In [None]:
%%spark

import time
b=sc.broadcast([3,5]) #Creating a broadcast variable available on all executors
a=sc.accumulator(0)   #Creating an accumulator for adding values across executors
RDD0=sc.parallelize([y for y in range(0,5)]) #RDD from input python collection
RDD2=sc.parallelize([z for z in range(10,15)])
RDD1=RDD0.cartesian(RDD2) 
cached=RDD2.cache() #Testing cached RDD
RDD22=RDD1.map(lambda x:x[0]+x[1]+b.value[0])
RDD3=RDD22.repartition(5) # To trigger a new stage.
RDD4=RDD2.map(lambda x: 3*x-b.value[0])
RDD5=RDD3.filter(lambda x:x%2==0)
RDD6=RDD4.filter(lambda x:x%2!=0)
RDD7=RDD5.cartesian(RDD6)
RDD8=RDD7.flatMap(lambda x: [x[i] for i in range(0,2)])
RDD9=RDD8.union(cached)
ans=RDD9.reduce(lambda x,y: x+y) # Doing a simple sum on the random data.
print(ans)
def f(x):
    global a
    time.sleep(0.7) #Making the job run a little longer
    a+=x
RDD9.foreach(f)
print(a.value)

### Example: Spark Pi

In [None]:
%%spark

from random import random
from operator import add

partitions = 10
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))