### How to setup the spark context.

Download the latest spark https://www.apache.org/dyn/closer.lua/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz

Go Inside and run the spark-shell command. This will download all the relevant jars.

A SparkContext is a client of Spark’s execution environment and it acts as the master of the Spark application. SparkContext sets up internal services and establishes a connection to a Spark execution environment.

You may want to avoid hard-coding certain configurations in a SparkConf. For instance, if you’d like to run the same application with different masters or different amounts of memory. Spark allows you to simply create an empty conf:

    val sc = new SparkContext(new SparkConf())

Then, you can supply configuration values at runtime:

    ./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

### A quick word on spark tools: sbt, and spark-submit, spark-shell, pyspark

Sbt is the built tool for building scala applications.

You will need to submit applications to your spark cluster using the spark-submit.

Spark-shell will help you in understanding the code execution flow. It is similar to ipython for spark.

FInally the bigger question of whether to use scala. According to me the question is do you already have a lot of legacy code in python, and how comfortable is your team to go into the typed environment of scala. Do you believe that strict typing is your friend, because that will be an extra cognitive load. If you ask me, making the upfront investment in typing will help you in your data debugging process. I highly recommend this is the data that you are getting is ambiguous and is likely to change over time.

### Difference between a transformation and action

In pandas everything is a transformation.

Transformations are executed on demand.(Lazy computation)
Ex: filter(), union()

An Action will return a non-RDD type (your stored value types usually)
Actions triggers execution using lineage graph to load the data into original RDD
Ex: count(), first()

### Difference: creating a pandas DF and a Spark DF.

DataFrames generally refer to a data structure, which is tabular in nature. It represents rows, each of which consists of a number of observations. Rows can have a variety of data formats (heterogeneous), whereas a column can have data of the same data type (homogeneous). DataFrames usually contain some metadata in addition to data; for example, column and row names.

#### Pandas:

![title](img/pandas_read_csv.png)

#### Spark

In [None]:
val house_prices_df = spark.read
    .format("csv")                                    // this is a csv file.
    .option("header", "true")                         // the file contains headers
    .option("inferSchema", true)                      // read the schema
    .load("/home/jovyan/data/house-prices/train.csv") // now load the file.

val melb_data = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", true)
    .load("/home/jovyan/data/melbourne_housing_snapshot/melb_data.csv")

Taking a look at the dataframe.

In [None]:
house_prices_df.show(3)

In [None]:
house_prices_df.printSchema()

### Features of Spark Dataframes

    * DataFrames are distributed in nature, which makes it a fault tolerant and highly available data structure.
    * Lazy evaluation is an evaluation strategy which holds the evaluation of an expression until its value is needed. It avoids repeated evaluation. Lazy evaluation in Spark means that the execution will not start until an action is triggered. In Spark, the picture of lazy evaluation comes when Spark transformations occur.
    * DataFrames are immutable in nature. By immutable, I mean that it is an object whose state cannot be modified after it is created. But we can transform its values by applying a certain transformation, like in RDDs.


### Describing a particular column

#### Pandas

![Describing in pandas](./img/pandas_describe.png)

#### Spark

In [None]:
house_prices_df.describe("MSSubClass").show()

### Get the dataframe shape and columns

#### Pandas

![](./img/pandas_shape_columns.png)

#### Spark

Getting the number of samples is an action and hence be mindful

In [None]:
house_prices_df.count

Getting the number of features is a no big deal

In [None]:
house_prices_df
    .columns
    .size

In [None]:
house_prices_df.columns

### Changing the column names of the dataframes

#### Pandas

![](./img/change_column_names.png)

#### Spark

In [None]:
val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema

In [None]:
val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.show

In [None]:
import org.apache.spark.sql.functions.col

val lookup = Map("Id" -> "id", "SalePrice" -> "SalePriceDollars")

val changed_cols_df = house_prices_df.select(
    house_prices_df.columns
    .map(
        c => col(c).as(lookup.getOrElse(c, c))
    ): _*)

In [None]:
changed_cols_df.printSchema

#### Unique Values

#### Pandas

![](./img/unique_values.png)

#### Spark

In [None]:
house_prices_df
    .select("MSSubClass")
    .distinct()
    .show()

For large data when you are only interested in the count of unique values.

In [None]:
import org.apache.spark.sql.functions.countDistinct

house_prices_df.select(countDistinct('MSSubClass)).show(3)

You can also do count approximate

#### Value Counts

ref: https://stackoverflow.com/a/37949565/5417164

#### Pandas

![](./img/value_counts.png)

#### Spark

In [None]:
import org.apache.spark.sql.functions.count

house_prices_df
    .groupBy("MSSubClass")  // groupby your class
    .count()                // count the values, this should create a dedicated count column
    .orderBy($"count" desc)       // orderby the count column
    .show()

### Order by and group by

#### Pandas

![](./img/orderbuy_grpby.png)

#### Spark

In [None]:
import org.apache.spark.sql.functions.count

house_prices_df
    .groupBy($"MSSubClass")                   // Count number of occurrences of each word
    .agg(count("*") as "numOccurances")       // SQL: SELECT COUNT(DISTINCT MSSubClass) AS numOccurances FROM house_prices_df
    .orderBy($"numOccurances" desc).show()

### Filtering data

#### Pandas

![](./img/filtering.png)

#### Spark

In [None]:
val highSubClass = house_prices_df
    .filter($"MSSubClass" > 100)

In [None]:
highSubClass.show(3)

### Membership in dataframe

#### Pandas

![](./img/membership.png)

#### Spark

In [None]:
val presentList = List("20","60") 
val nopresentList = List("20000") 
val validMembership = house_prices_df
    .filter($"MSSubClass"
            .isin(presentList:_*))

In [None]:
val invalidMembership = house_prices_df
    .filter($"MSSubClass"
            .isin(nopresentList:_*))

You can see that the count below is the sum of 536 + 299

In [None]:
validMembership.count

In [None]:
invalidMembership.count

### Missing value imputation

refs: https://stackoverflow.com/a/40059453/5417164 

https://medium.com/@mrpowers/dealing-with-null-in-spark-cfdbb12f231e

In [None]:
melb_data.printSchema()

In [None]:
melb_data.show(3)

#### Pandas

![](./img/fillna.png)

Similar to pandas you can replace the na values

In [None]:
val imputed_melb_data = melb_data
    .na
    .fill(1964.0, Seq("YearBuilt"))

In [None]:
imputed_melb_data.show(5)

Using an imputer

#### Pandas

![](./img/imputer.png)

#### Spark

In [None]:
import org.apache.spark.ml.feature.Imputer
import org.apache.spark.sql.functions.col

val features_in_focus = Array("Rooms", "Bathroom", "Landsize", "BuildingArea",
                              "YearBuilt", "Lattitude", "Longtitude")
val features_in_focus_imputed = features_in_focus.map(c => s"${c}_imputed")

val imputer = new Imputer()
  .setInputCols(features_in_focus)
  .setOutputCols(features_in_focus_imputed)
  .setStrategy("mean")

val imputed_melb_data = imputer.fit(melb_data).transform(melb_data)
// val imputed_melb_data = imputer.fit(melb_data2).transform(melb_data2)

imputed_melb_data.select(features_in_focus_imputed.map(name => col(name)):_*).show(5)

As you can see above the features must be for double type or floattype. But the Rooms feature is of type Integer and hence we will need to convert that.

In [None]:
import org.apache.spark.sql.types.DoubleType

val melb_data2 = melb_data
    .withColumn("_Rooms", melb_data("Rooms").cast(DoubleType))
    .drop("Rooms")
    .withColumnRenamed("_Rooms", "Rooms")

In [None]:
import org.apache.spark.ml.feature.Imputer
import org.apache.spark.sql.functions.col

val features_in_focus = Array("Rooms", "Bathroom", "Landsize", "BuildingArea",
                              "YearBuilt", "Lattitude", "Longtitude")
val features_in_focus_imputed = features_in_focus.map(c => s"${c}_imputed")

val imputer = new Imputer()
  .setInputCols(features_in_focus)
  .setOutputCols(features_in_focus_imputed)
  .setStrategy("mean")

val imputed_melb_data = imputer.fit(melb_data2).transform(melb_data2)

imputed_melb_data.select(features_in_focus_imputed.map(name => col(name)):_*).show(5)

### Discretization and Binning

#### Pandas

![](./img/binning.png)

#### Spark

In [None]:
import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

println("showing the dataframe")
dataFrame.show()

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

In [None]:
val data = Array(20.0, 22.0, 25.0, 27.0, 21.0, 23.0, 37.0, 31.0, 61.0, 45.0, 41.0, 32.0)
val bins = Array(18.0, 25.0, 35.0, 60.0, 100.0)

// val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

println("showing the dataframe")
dataFrame.show()

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(bins)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

### Getting a particular data

ref: https://stackoverflow.com/a/35720457/5417164

#### Pandas

![](./img/iloc.png)

#### Spark

In [None]:
val result = house_prices_df.
    filter(line => line(0) == 1)
    .select("MSSubClass").collect()

In [None]:
result

### Sorting

ref: https://stackoverflow.com/a/32052881/5417164

#### Pandas

![](./img/sorting.png)

#### Spark

In [None]:
import org.apache.spark.sql.functions._
val sortedbyMsSubclass = house_prices_df
    .sort(
        desc("MSSubClass"))

In [None]:
val columnNames = Seq("MSSubClass", "SalePrice")
sortedbyMsSubclass.select(columnNames.map(c => col(c)): _*).show(3)

### Reshaping and Pivoting

#### Pandas

![](./img/pivot.png)

#### Spark

In [None]:
// create RDD for products
val data = sc.parallelize(Seq(
    ("memories","book","q1",10),
    ("dreams","book","q2",20),
    ("reflections","book","q3",30),
    ("how to build a house","book","q4",40),
    ("wonderful life","music","q1",10),
    ("million miles","music","q2",20),
    ("run away","music","q3",30),
    ("mind and body","music","q4",40)
))

// convert the RDD to DataFrame
val df_products = spark.createDataFrame(data).toDF("product","category","quarter","profit")
df_products.show()

// index column : category
// value column : profit
// pivot column : quarter
// agg function : sum

// apply pivot on DataFrame DataFrame
df_products
    .groupBy("category")
    .pivot("quarter")
    .sum("profit")
    .show()

### Merges and Joins

#### Pandas

![](./img/merges.png)

#### Spark

In [None]:
val llist = Seq(("bob", "2015-01-13", 4), ("alice", "2015-04-23",10))
val left = llist.toDF("name","date","duration")
val right = Seq(("alice", 100),("bob", 23)).toDF("name","upload")

val df = left
    .join(right, Seq("name"))
df.show()

#### Concatenating and appending to the dataframe.

#### Pandas

![](./img/append.png)

#### Spark

In [None]:
val llist = Seq(("bob", "2015-01-13", 4), ("alice", "2015-04-23",10))
val arr = llist.toDF("name","date","duration")
val appended = arr.union(
    arr.toDF())
appended.show()

### Function application, transformations and mapping

By using user defined functions

In [None]:
import org.apache.spark.sql.functions.udf

In [None]:
def morePrecision(price: Integer): Float = price.toFloat

// we use the method name followed by a "_" to indicate we want a reference
// to the method, not call it
val morePrecisionUdf = udf(morePrecision _)

val converted_df = house_prices_df.select(
    morePrecisionUdf(house_prices_df("SalePrice")))

In [None]:
converted_df.show(3)

Applying the udf on the same df. ie. creating a new feature by transforming another column.

In [None]:
house_prices_df
    .withColumn("MorePrecisionSalePrice", morePrecisionUdf('SalePrice))
    .show(3)

Applying some transformation on all the data.

In [None]:
import org.apache.spark.sql.functions.{col, upper}

val df = sc.parallelize(
  Seq(("a", "B", "c"), ("D", "e", "F"))).toDF("x", "y", "z")
df.select(df.columns.map(c => upper(col(c)).alias(c)): _*).show

### A look at datasets

The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

In [None]:
import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()

### Additional Capabilities: Streaming.

In [None]:
import org.apache.spark.sql.types.{StringType, StructType, StructField, IntegerType}

val userSchema = new StructType().add("Suburb", "string").add("Address", "string")
val csvDF = spark
  .readStream
  .option("sep", ",")
  .schema(userSchema)      // Specify schema of the csv files
  .option("maxFilesPerTrigger", 1)
  .csv("/home/jovyan/data/melbourne_housing_snapshot/")

In [None]:
// Same query as staticInputDF
import org.apache.spark.sql.functions._

val streamingCountsDF = 
  csvDF
    .groupBy($"Suburb", window($"Address", "1 minute"))
    .count()

streamingCountsDF.isStreaming

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "1")  // keep the size of shuffles small

val query =
  streamingCountsDF
    .writeStream
    .format("memory")        // memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     // counts = name of the in-memory table
    .outputMode("complete")  // complete = all the counts should be in the table
    .start()

In [None]:
spark.sql("select * from counts").show()

### A sample machine learning dataset.

In [None]:
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.evaluation.RegressionEvaluator

val bike_sharing_df = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", true)
    .load("/home/jovyan/data/bike-sharing/hour.csv")

In [None]:
val featureCols = Array("season", "yr", "mnth", "hr", 
                        "holiday", "weekday", "workingday",
                        "weathersit", "temp", "atemp",
                        "hum", "windspeed", "cnt")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val dataDF = assembler.transform(bike_sharing_df)
val dataDF1 = dataDF.withColumnRenamed("cnt", "label")

val Array(train, test) = dataDF1.randomSplit(Array(0.8, 0.2))

val lr = new LinearRegression()
    .setMaxIter(1000)
    .setRegParam(0.3)
    .setElasticNetParam(0.8)

//Fit the model
val lrModel = lr.fit(train)