In [1]:
//Below two cells are Scala
val dataValues = Array(20,34,44,23,43)
val dataSet = sc.parallelize(dataValues) // parallelize() creates a RDD from the array, use Spark Context for this

StatementMeta(sparkpool, 1, 2, Finished, Available, Finished)

dataValues: Array[Int] = Array(20, 34, 44, 23, 43)
dataSet: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:30


In [2]:
val maxVal = dataSet.max
maxVal

StatementMeta(sparkpool, 1, 3, Finished, Available, Finished)

maxVal: Int = 44
res4: Int = 44


In [None]:
// For running the above code, attach this notebook to 'sparkpool' and 'scala' langugae. Then go to the Settings button on the right side
// and configure the current Spark session

In [3]:
//Below cells are Python

StatementMeta(sparkpool, 1, 4, Finished, Available, Finished)

In [4]:
courses = [(1,'CourseA',10.99), (2,'CourseB',12.99), (3,'CourseC',8.99)]
#Use spark session
df = spark.createDataFrame(courses, ['ID','CourseName', 'Price'])
display(df)

StatementMeta(sparkpool, 1, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 65bd522c-3700-4609-b714-54cf37749133)

In [7]:
from pyspark.sql.functions import col
sorted_df = df.sort(col("Price").desc())
display(sorted_df)

StatementMeta(sparkpool, 1, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4c040ea2-e36f-4134-a3eb-1116a8113526)

In [None]:
#Using Azure Syanpse we can link our storage account onto Synape workspace via the use of role assignments
# We can give access to users defined in Azure AD permissions onto ADLs account
# Give 'storage blob reader' permission for 'adlsraeez' storage account to 'mohammadraeez.mec@gmail.com' user
# If we log in to Syanpse using this acccount we can access ADLS contianer


#below code reads data in the container adlsraeez/parquet_adf/log.parquet

In [8]:
df = spark.read.load('abfss://parquet-adf@adlsraeez.dfs.core.windows.net/Log.parquet', format='parquet')
display(df)

StatementMeta(sparkpool, 1, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3f028bd0-94d4-40b3-90a1-ab99973add71)

In [9]:
#Choose only columns of interest
display(df.select(col('Correlationid'), col('OperationName'), col('Resourcegroup')))

StatementMeta(sparkpool, 1, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6ed671ce-27d3-41c9-a2f3-7f81e845a0bf)

In [None]:
#filtering using "where"
display(df.where(df['Resourcegroup'] == "app-grp"))

In [11]:
#FILTER
display(df.filter(col("Resourcegroup").isNull()))
display(df.filter(col("Resourcegroup").isNull()).count())

StatementMeta(sparkpool, 1, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2b96aa2c-9977-4d63-9bcd-210ddb88f205)

191

In [12]:
# Group by
summary_df = df.groupBy("Resourcegroup").count()
display(summary_df)

StatementMeta(sparkpool, 1, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5848fe46-5895-4057-965e-360d0ecd9b1a)

In [None]:
#Saving to a delta table
tablepath = "/delta/ActivityLog"
df.write.format("delta").save(tablepath)

In [None]:
#The location of this delta table (tablepath) is inside the container which is attached to this synapse workspace (ie 'synapseadlsraeez' storage account)
# When you save as delta table, the underlying table is stored as parquet files

In [None]:
#reading from delta table
ActivityLogDf = spark.read.format("delta").load(tablepath)
display(ActivityLogDf)

In [13]:
# Temp views
df.createOrReplaceTempView("logdata")

StatementMeta(sparkpool, 1, 15, Finished, Available, Finished)

In [17]:
# now run sql queries against this temp view
sqlResult = spark.sql("SELECT * FROM logdata")
# sqlResult.show()
display(sqlResult)

StatementMeta(sparkpool, 1, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d1e67a90-dd1a-4d5a-80f6-01045d3d59d1)

In [20]:
sql2 = spark.sql("SELECT OperationName, count(OperationName) from logdata group by OperationName")
display(sql2)

StatementMeta(sparkpool, 1, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3baf14ed-ea91-47ae-bee6-468117c1146b)

In [21]:
%%sql
-- Changed the lang to SQL
SELECT OperationName, count(OperationName) from logdata group by OperationName

StatementMeta(sparkpool, 1, 23, Finished, Available, Finished)

<Spark SQL result set with 217 rows and 2 fields>

In [None]:
# In the below codes, we take data from ADLS and put them onto a table in Synapse dedicated SQL pool

In [28]:
%%spark
//using SCALA
//Azure SYnapse Dedicated Pool Connector for Spark is a special connector that allows you to read and write
//data from your tables in your dedicated SQL pool
val CsvDf = spark.read.format("csv").option("header","true").load("abfss://csv@adlsraeez.dfs.core.windows.net/Log.csv")
display(CsvDf)

StatementMeta(sparkpool, 1, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c6cb783a-3296-44f0-a1ae-41c1717633ed)


CsvDf: org.apache.spark.sql.DataFrame = [Correlation id: string, Operation name: string ... 9 more fields]


In [30]:
%%spark
CsvDf.schema

StatementMeta(sparkpool, 1, 32, Finished, Available, Finished)

res18: org.apache.spark.sql.types.StructType = StructType(StructField(Correlation id,StringType,true),StructField(Operation name,StringType,true),StructField(Status,StringType,true),StructField(Event category,StringType,true),StructField(Level,StringType,true),StructField(Time,StringType,true),StructField(Subscription,StringType,true),StructField(Event initiated by,StringType,true),StructField(Resource type,StringType,true),StructField(Resource group,StringType,true),StructField(Resource,StringType,true))


In [32]:
%%spark
// The above wrongly maps the data type 
// so give a custom shcema when you read
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val dataSchema = StructType(Array(
    StructField("Correlationid", StringType, true),
    StructField("Operationname", StringType, true),
    StructField("Status", StringType, true),
    StructField("Eventcategory", StringType, true),
    StructField("Level", StringType, true),
    StructField("Time", TimestampType, true),
    StructField("Subscription", StringType, true),
    StructField("Eventinitiatedby", StringType, true),
    StructField("Resourcetype", StringType, true),
    StructField("Resourcegroup", StringType, true),
    StructField("Resource", StringType, true)
))
val CsvDf = spark.read.format("csv").option("header","true").schema(dataSchema).load("abfss://csv@adlsraeez.dfs.core.windows.net/Log.csv")

StatementMeta(sparkpool, 1, 34, Finished, Available, Finished)

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
dataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Correlationid,StringType,true),StructField(Operationname,StringType,true),StructField(Status,StringType,true),StructField(Eventcategory,StringType,true),StructField(Level,StringType,true),StructField(Time,TimestampType,true),StructField(Subscription,StringType,true),StructField(Eventinitiatedby,StringType,true),StructField(Resourcetype,StringType,true),StructField(Resourcegroup,StringType,true),StructField(Resource,StringType,true))
CsvDf: org.apache.spark.sql.DataFrame = [Correlationid: string, Operationname: string ... 9 more fields]


In [34]:
%%spark
CsvDf.schema

StatementMeta(sparkpool, 1, 36, Finished, Available, Finished)

res23: org.apache.spark.sql.types.StructType = StructType(StructField(Correlationid,StringType,true),StructField(Operationname,StringType,true),StructField(Status,StringType,true),StructField(Eventcategory,StringType,true),StructField(Level,StringType,true),StructField(Time,TimestampType,true),StructField(Subscription,StringType,true),StructField(Eventinitiatedby,StringType,true),StructField(Resourcetype,StringType,true),StructField(Resourcegroup,StringType,true),StructField(Resource,StringType,true))


In [40]:
%%spark
//Now we can write the df to the Synapse table using basic authentication method
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "enter the synapse sql engpoint",
                                                Constants.USER -> "enter username here",
                                                Constants.PASSWORD -> " ",
                                                Constants.DATA_SOURCE -> "the taget database",
                                                Constants.TEMP_FOLDER -> "ADLS location for staging folder",
                                                Constants.STAGING_STORAGE_ACCOUNT_KEY -> "Storage Account key ")
// Constants.STAGING_STORAGE_ACCOUNT_KEY = key of storage account
// Constants.TEMP_FOLDER = staging folder

import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
CsvDf.write
        .options(writeOptionsWithBasicAuth)
        .mode(SaveMode.Overwrite)
        .synapsesql(tableName = "datapool.dbo.logdata_parquet",
                    tableType = Constants.INTERNAL,
                    location = None)

// VERY IMP: For this cell to work we need to give 'Storage Blob Contributor' role for the storage accoubnt


StatementMeta(sparkpool, 1, 42, Finished, Available, Finished)

writeOptionsWithBasicAuth: Map[String,String] = Map(logical_server -> dpraeez.sql.azuresynapse.net, temp_folder -> abfss://staging@adlsraeez.dfs.core.windows.net, staging_storage_acount_key -> JYgP7N+JaMow0Kt1GYi7qkKNZ5MgdmdIfaSVQ2rP+KuzsA62+AojZYtmDJEki+mO6lWnkaoCbM2V+AStWVhLDQ==, data_source -> datapool, user -> sqladmin, password -> Raeez@212121)
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


In [42]:
%%sql
-- create a databse and table from Spark Pool
-- This creates a lake database
CREATE DATABASE internaldb

StatementMeta(sparkpool, 1, 44, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [43]:
%%sql
USE internaldb

StatementMeta(sparkpool, 1, 45, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [44]:
%%sql
CREATE TABLE internaldb.customer(Id int, name varchar(100)) USING Parquet

StatementMeta(sparkpool, 1, 46, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [45]:
%%sql
INSERT INTO internaldb.customer VALUES(1, 'UserA')

StatementMeta(sparkpool, 1, 47, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [46]:
%%sql
SELECT * FROM customer

StatementMeta(sparkpool, 1, 48, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 2 fields>

In [47]:
%%pyspark
df = spark.read.load("abfss://csv@adlsraeez.dfs.core.windows.net/Log.csv", format="csv", header=True)
df.write.mode("overwrite").saveAsTable("internaldb.logdatanew")

StatementMeta(sparkpool, 1, 49, Finished, Available, Finished)

In [48]:
%%sql
SELECT * FROM internaldb.logdatanew

StatementMeta(sparkpool, 1, 50, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 11 fields>