In [1]:
1+1

StatementMeta(, 81c10cf4-7ddc-4d88-8821-4ab5290af4aa, 3, Finished, Available, Finished)

res8: Int = 2


# Progress indicator

In [2]:
val df = spark.read.format("csv").option("header","true").load("Files/online_retail.csv")
display(df)

StatementMeta(, 0752e1d2-9b06-4d0a-9c3c-2b99d98db087, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8397728b-53d5-4b31-8a18-1aa8cc30dcd0)


df: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]


# Resource Utilization

### 1. Run below preparation code first:

In [5]:
// Create some fake data and task definition for the demostration below.
import java.util.concurrent.TimeUnit
import scala.util.Random

val data = Array.fill(32)(Random.nextInt(32))

def calculation(iter: Iterator[Int]): Iterator[Int] = {
    val sum = iter.map(_ => 1).sum
    TimeUnit.SECONDS.sleep(10*sum)
    Iterator(sum)
}

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 7, Finished, Available, Finished)

import java.util.concurrent.TimeUnit
import scala.util.Random
data: Array[Int] = Array(26, 5, 17, 3, 16, 8, 14, 12, 5, 9, 6, 15, 14, 5, 1, 21, 25, 7, 5, 29, 19, 26, 20, 17, 18, 2, 12, 7, 2, 19, 31, 23)
calculation: (iter: Iterator[Int])Iterator[Int]


### 2. In the resource usage tab, we can see that the job did not use all of available cores. It means that we should increase parallelism to fully use the executor resource.

In [6]:
// Run a simple job.
// In the resource usage tab, we can see that the job did not use all of available cores. It means that we should increase parallelism to fully use the executor resource.
val rdd1 = sc.parallelize(data, 10)
val results1 = rdd1.mapPartitions(calculation).collect()
println(results1)

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 8, Finished, Available, Finished)

[I@26508fcb
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at <console>:30
results1: Array[Int] = Array(3, 3, 3, 3, 4, 3, 3, 3, 3, 4)


### 3. Run the same job again, but with more partitions. Now we can see that all cores are used.

In [7]:
// Run the same job again, but with more partitions.
// Now we can see that all cores are used.
val rdd2 = sc.parallelize(data, 32)
val results2 = rdd2.mapPartitions(calculation).collect()
println(results2)

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 9, Finished, Available, Finished)

[I@a782ba8
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[82] at parallelize at <console>:30
results2: Array[Int] = Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)


# Spark Advisor

## 1. Error Type

###### Advice with Error type usually would be within the cell with failed execution. But not every failed cell will have an error type advice.
For this case, usually the tsg will include how you could make your cell execute successfully.

### 1.1 Spark Job Failed/File Not Found

In [8]:
%%spark
def test(): Unit = {
    val data = 1 to 100000
    val inputRdd = sc.parallelize(data, 3)
    inputRdd.map(number => {
        // Thread.sleep(1000)
        if (number % 5 == 0) {
            throw new java.io.FileNotFoundException("Not found issues")
        }

        if (number % 3 == 0) {
            throw new RuntimeException("Authenticate failure issues")
        }
    }).count()
}

val df = spark.read.format("csv").option("header","true").load("Files/online_retail.csv")
display(df)

test()

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cf249f21-38a4-469b-aaa6-b2bd468fa381)




Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 23.0 failed 4 times, most recent failure: Lost task 1.3 in stage 23.0 (TID 69) (vm-65011780 executor 2): java.io.FileNotFoundException: Not found issues

### 1.2 Path Already exists Error, and solution is provided as: To overwrite the existing file, set mode("overwrite") on the DataFrameWriter

In [10]:
%%spark
df.write.csv("Files/catalogparquet2csv")

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 12, Finished, Available, Finished)

Error: org.apache.spark.sql.AnalysisException: [PATH_ALREADY_EXISTS] Path abfss://4cb9b656-c8f8-485e-a151-e81bb913abc8@msit-onelake.dfs.fabric.microsoft.com/eb9433f1-ec4d-4683-89bd-49b5c4ee819b/Files/catalogparquet2csv already exists. Set mode as "overwrite" to overwrite the existing path.

#### After adding overwrite as saveMode, execution succeeded.

In [11]:
%%spark
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).csv("Files/catalogparquet2csv")

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 13, Finished, Available, Finished)

import org.apache.spark.sql.SaveMode


## 2. Warning Type

#### Advice with warning type usually would not result in a failed execution, but some of the passed in confs/plugins might not work as expected. Or there might exist perf issue like skew.
The message would mostly about how to make it work as you expected

### 2.1 Data Skew/Time Skew

In [12]:
%%spark
import org.apache.spark.SparkContext 
import scala.util.Random 
def testDataSkew(sc: SparkContext): Unit = {
    val numMappers = 400
    val numKVPairs = 10000
    val valSize = 256
    val numReducers = 200
    val biasPct = 0.4
    val biasCount = numKVPairs * biasPct
    for (i <- 1 to 2) {
      val query = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
        val ranGen = new Random
        val arr1 = new Array[(Int, Array[Byte])](numKVPairs)
        for (i <- 0 until numKVPairs) {
          val byteArr = new Array[Byte](valSize)
          ranGen.nextBytes(byteArr)
          var key = ranGen.nextInt(Int.MaxValue)
          if(i <= biasCount) {
            key = 1
          }
          arr1(i) = (key, byteArr)
        }
        arr1
      }.groupByKey(numReducers)
      // Enforce that everything has been calculated and in cache
      // scalastyle:off println
      println(query.count())
      // scalastyle:on println
      Thread.sleep(1000)
    }
  }


testDataSkew(sc)

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 14, Finished, Available, Finished)

2398241
2398279
import org.apache.spark.SparkContext
import scala.util.Random
testDataSkew: (sc: org.apache.spark.SparkContext)Unit


In [24]:
%%spark
import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 8) {
    Thread.sleep(30)
  }
  x
}.repartition(200).map { x =>
  x
}
res.distinct().count()

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 26, Finished, Available, Finished)

import org.apache.spark.TaskContext
res: org.apache.spark.sql.Dataset[Long] = [value: bigint]
res46: Long = 1000000


### 2.2 Hint related warning

#### (1) Hint Not recognized

create below table first:

In [14]:
spark.sql("CREATE TABLE t1 (str STRING) USING parquet")
spark.sql("CREATE TABLE t2 (str STRING) USING parquet")
spark.sql("CREATE TABLE t3 (str STRING) USING parquet")

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 16, Finished, Available, Finished)

Error: org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `spark_catalog`.`monitoringlh`.`t1` because it already exists.

In [15]:
spark.sql("SELECT /*+ testtest */ * FROM t1")

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 17, Finished, Available, Finished)

res30: org.apache.spark.sql.DataFrame = [str: string]


#### (2) Hint Overriden

In [16]:
spark.sql("SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.str = t2.str")

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 18, Finished, Available, Finished)

res31: org.apache.spark.sql.DataFrame = [str: string, str: string]


#### (3) Couldn't found relations

In [17]:
spark.sql("SELECT /*+ BROADCAST(test1) */ * FROM t1 INNER JOIN t2 ON t1.str = t2.str")

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 19, Finished, Available, Finished)

res32: org.apache.spark.sql.DataFrame = [str: string, str: string]


## 3. Info Type

##### Advice with Info type usually mean use this way might improve the execution.

### 3.1 RandomSplit might introduce inconsistency

In [18]:
%%spark
val rdd = sc.parallelize(1 to 1000000)
val rdd2 = rdd.repartition(64)
val Array(train, test) = rdd2.randomSplit(Array(70, 30), 1)
train.takeOrdered(10)

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 20, Finished, Available, Finished)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[152] at parallelize at <console>:31
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[156] at repartition at <console>:31
train: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[157] at randomSplit at <console>:32
test: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[158] at randomSplit at <console>:32
res33: Array[Int] = Array(2, 3, 4, 5, 6, 8, 9, 10, 13, 14)


#### below data is not stable during multi runs, even there is overlap between dataset: train and test.

In [19]:
%%spark
train.takeOrdered(10)

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 21, Finished, Available, Finished)

res34: Array[Int] = Array(2, 3, 4, 5, 6, 8, 9, 10, 13, 14)


### 3.2 DivisionExprAdvise

Division expressions can be reduced and shuffled to reduce rounding error propagation. 

In this advise, we check the optimized logical plan to get the expressions below: 

A / B * C, A * (B / C), A / B / C, A / (B / C) and if A’s data type is Double. 

These expressions can be converted to sematic equivalent expressions: Like A / B / C into A / (B * C) 

In [20]:
val exprA = "(CAST(id AS DOUBLE) + 1.0D)"
val exprB = "(CAST(id AS DOUBLE) + 2.0D)"
val exprC = "(CAST(id AS DOUBLE) * 3.0D)"
val expr = s"${exprA} / ${exprB} * ${exprC}"
val df = spark.range(1)
        .selectExpr(expr)
        .as("result")
      df.collect()

StatementMeta(, a86d7c03-092e-482f-b3ab-ccd21d3b0522, 22, Finished, Available, Finished)

exprA: String = (CAST(id AS DOUBLE) + 1.0D)
exprB: String = (CAST(id AS DOUBLE) + 2.0D)
exprC: String = (CAST(id AS DOUBLE) * 3.0D)
expr: String = (CAST(id AS DOUBLE) + 1.0D) / (CAST(id AS DOUBLE) + 2.0D) * (CAST(id AS DOUBLE) * 3.0D)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [(((CAST(id AS DOUBLE) + 1.0) / (CAST(id AS DOUBLE) + 2.0)) * (CAST(id AS DOUBLE) * 3.0)): double]
res35: Array[org.apache.spark.sql.Row] = Array([0.0])
