# Basics of Spark on HDInsight

<a href="http://spark.apache.org/" target="_blank">Apache Spark</a> is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications. When you provision a Spark cluster in HDInsight, you provision Azure compute resources with Spark installed and configured. The data to be processed is stored in Azure Blob storage (WASB).

![Spark on HDInsight](https://mysstorage.blob.core.windows.net/notebookimages/overview/SparkArchitecture.png "Spark on HDInsight")

Now that you have created a Spark cluster, let us understand some basics of working with Spark on HDInsight. For detailed discussion on working with Spark, see [Spark Programming Guide](https://spark.apache.org/docs/2.0.0/programming-guide.html).

----------
## Notebook setup

When using Spark kernel notebooks on HDInsight, there is no need to create a SparkContext or a SparkSession; a SparkSession which has the SparkContext is created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkSession (spark)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

Everytime you run a cell, your web browser window title will show a **(Busy)** status along with the notebook title. You will also see a solid circle next to the **Spark** text in the top-right corner. After the job completes, this will change to a hollow circle.

![Status of a Jupyter notebook job](https://mysstorage.blob.core.windows.net/notebookimages/overview/HDI.Spark.Jupyter.Job.Status.Spark.Kernel.png "Status of a Jupyter notebook job")

----
## Configure notebook to use Azure Cosmos DB Spark Connector

The Spark community contributes a lot of packages that extend Spark. These packages might not be available out of the box in the Spark distribution that you are using. Here is an example of how to use **spark-csv**, a CSV data source for Spark, in a notebook using the `%%configure` magic.

Prior to executing any code, you will first need to include the Azure DocumentDB Spark JAR and CosmosDB Jar. You can use the below 'spark magic' command: 

In [1]:
%%configure -f
{ "jars": ["wasb:///example/jars/azure-documentdb-1.14.0.jar","wasb:///example/jars/azure-cosmosdb-spark_2.1.0_2.11-1.0.0.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

----
## Connecting Spark to Cosmos DB via the azure-cosmosdb-spark

While the communication transport is a little more complicated, executing a query from Spark to Cosmos DB using 'azure-cosmosdb-spark' is significantly faster.

Below is a code snippet on how to use 'azure-cosmosdb-spark' within a Spark context.

In [2]:
// Import Necessary Libraries
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.apache.spark.sql.SaveMode

// Configure connection to your collection
val cosmosDBReadConfig = Config(Map("Endpoint" -> "https://contosoair-fight-record.documents.azure.com:443/",
"Masterkey" -> "I6lQxqgIdejcmdxYcj70l6bGDPVb5c2qbs8zrS6OpmjNVZ02FZnS9ffKifAwuzRK9kPYFn9Ib9ohoNnen0CDhA==",
"Database" -> "contosoair-flight-record",
"preferredRegions" -> "West US;East US",
"Collection" -> "flight-records", 
"SamplingRatio" -> "1.0"))
 
// Create temporary table 
val coll = spark.sqlContext.read.cosmosDB(cosmosDBReadConfig)
coll.createOrReplaceTempView("flightTemp")

// Create global table
val sqlDF = spark.sql("SELECT Year, Quarter, Month, AirlineID, Carrier, OriginAirportID, Origin, OriginCityName, OriginState, OriginStateName, DestAirportID, Dest, DestCityName, DestState, DestStateName, CRSDepTime, DepTime, DepDelay, DepartureDelayGroups, CRSArrTime, ArrTime, ArrDelay, ArrivalDelayGroups, Cancelled, CancellationCode, Diverted, Distance, DistanceGroup, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay FROM flightTemp")
sqlDF.write.mode(SaveMode.Overwrite).saveAsTable("flightDB")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1511328301925_0005,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
