### How to setup the spark context.

Download the latest spark https://www.apache.org/dyn/closer.lua/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz

Go Inside and run the spark-shell command. This will download all the relevant jars.

A SparkContext is a client of Spark’s execution environment and it acts as the master of the Spark application. SparkContext sets up internal services and establishes a connection to a Spark execution environment.

You may want to avoid hard-coding certain configurations in a SparkConf. For instance, if you’d like to run the same application with different masters or different amounts of memory. Spark allows you to simply create an empty conf:

    val sc = new SparkContext(new SparkConf())

Then, you can supply configuration values at runtime:

    ./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

### A quick word on spark tools: sbt, and spark-submit, spark-shell, pyspark

Sbt is the built tool for building scala applications.

You will need to submit applications to your spark cluster using the spark-submit.

Spark-shell will help you in understanding the code execution flow. It is similar to ipython for spark.

FInally the bigger question of whether to use scala. According to me the question is do you already have a lot of legacy code in python, and how comfortable is your team to go into the typed environment of scala. Do you believe that strict typing is your friend, because that will be an extra cognitive load. If you ask me, making the upfront investment in typing will help you in your data debugging process. I highly recommend this is the data that you are getting is ambiguous and is likely to change over time.

### Difference between a transformation and action

In pandas everything is a transformation.

Transformations are executed on demand.(Lazy computation)
Ex: filter(), union()

An Action will return a non-RDD type (your stored value types usually)
Actions triggers execution using lineage graph to load the data into original RDD
Ex: count(), first()

### Things we will be looking at in this hack session.

* Creating dataframes.
* Get the dataframe shape and columns.
* Changing the column names of the dataframes.
* Orderby and groupby.
* Filtering data.
* Membership in dataframe.
* Missing value imputation.
* Getting a particular data.
* Reshaping and Pivoting.
* Function application, transformations and mapping.

### Things to note.

Similarities and differences between the apis of pandas dataframes and spark dataframes.

### Difference: creating a pandas DF and a Spark DF.

DataFrames generally refer to a data structure, which is tabular in nature. It represents rows, each of which consists of a number of observations. Rows can have a variety of data formats (heterogeneous), whereas a column can have data of the same data type (homogeneous). DataFrames usually contain some metadata in addition to data; for example, column and row names.

In [None]:
val house_prices_df = spark.read
    .format("csv")                                    // this is a csv file.
    .option("header", "true")                         // the file contains headers
    .option("inferSchema", true)                      // read the schema
    .load("/home/jovyan/data/house-prices/train.csv") // now load the file.

val melb_data = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", true)
    .load("/home/jovyan/data/melbourne_housing_snapshot/melb_data.csv")


Taking a look at the dataframe.

In [None]:
house_prices_df.show(3)

In [None]:
house_prices_df.printSchema()

### Describing a particular column

In [None]:
house_prices_df.describe("MSSubClass").show()

### Similarities in api

| Pandas | Spark   |
|------  |------   |
| pd.read_csv    | spark.read.load   |
| pd.d_types    | df.printSchema   |
| df.describe    | df.describe   |
| series.unique     | df.select.distinct   |
| df.groupby        | df.groupBy   |
| filterobject.isin | filterobject.isin   |
| df.fillna    | df.na.fill   |
| df.sort_values    | df.sort   |
| df.merge    | df.join   |
| df.append    | df.union   |
| df.to_csv    | df.write.format.save(f)  |

## Things that are different

Some quirks and things to looks out for.

### Get the dataframe shape and columns

Getting the number of samples is an action and hence be mindful

In [None]:
house_prices_df.count

Getting the number of features is a no big deal

In [None]:
house_prices_df
    .columns
    .size

In [None]:
house_prices_df.columns

### Changing the column names of the dataframes

In [None]:
val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema

In [None]:
val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.show

In [None]:
import org.apache.spark.sql.functions.col

val lookup = Map("Id" -> "id", "SalePrice" -> "SalePriceDollars")

house_prices_df.columns.map(c => col(c).as(lookup.getOrElse(c, c)))

In [None]:
import org.apache.spark.sql.functions.col

val lookup = Map("Id" -> "id", "SalePrice" -> "SalePriceDollars")

val changed_cols_df = house_prices_df.select(
    house_prices_df.columns
    .map(
        c => col(c).as(lookup.getOrElse(c, c))
    ): _*)

In [None]:
changed_cols_df.printSchema

### The groupby -> agg -> finalising pattern

#### Value Counts

ref: https://stackoverflow.com/a/37949565/5417164

In [None]:
import org.apache.spark.sql.functions.count

house_prices_df
    .groupBy("MSSubClass")  // groupby your class
    .count()                // count the values, this should create a dedicated count column
    .orderBy($"count" desc)       // orderby the count column
    .show()

### Order by and group by

In [None]:
import org.apache.spark.sql.functions.count

house_prices_df
    .groupBy($"MSSubClass")                   // Count number of occurrences of each word
    .agg(count("*") as "numOccurances")       // SQL: SELECT COUNT(DISTINCT MSSubClass) AS numOccurances FROM house_prices_df
    .orderBy($"numOccurances" desc).show()

In [None]:
import org.apache.spark.sql.functions.count

house_prices_df
    .groupBy($"MSSubClass")
    .agg(count("*") as "numOccurances")
    .sort($"numOccurances" desc).show()

### Filtering data

In [None]:
val highSubClass = house_prices_df
    .filter($"MSSubClass" > 100)

In [None]:
highSubClass.show(3)

### Membership in dataframe

In [None]:
val presentList = List("20","60") 
val nopresentList = List("20000") 
val validMembership = house_prices_df
    .filter($"MSSubClass"
            .isin(presentList:_*))

In [None]:
val invalidMembership = house_prices_df
    .filter($"MSSubClass"
            .isin(nopresentList:_*))

You can see that the count below is the sum of 536 + 299

In [None]:
validMembership.count

In [None]:
invalidMembership.count

### Missing value imputation

refs: https://stackoverflow.com/a/40059453/5417164 

https://medium.com/@mrpowers/dealing-with-null-in-spark-cfdbb12f231e

In [None]:
melb_data.printSchema()

In [None]:
melb_data.show(3)

Similar to pandas you can replace the na values

In [None]:
val imputed_melb_data = melb_data
    .na
    .fill(1964.0, Seq("YearBuilt"))

In [None]:
imputed_melb_data.show(5)

Using an imputer

In [None]:
import org.apache.spark.ml.feature.Imputer
import org.apache.spark.sql.functions.col

val features_in_focus = Array("Rooms", "Bathroom", "Landsize")
val features_in_focus_imputed = features_in_focus.map(c => s"${c}_imputed")

val imputer = new Imputer()
  .setInputCols(features_in_focus)
  .setOutputCols(features_in_focus_imputed)
  .setStrategy("mean")

val imputed_melb_data = imputer.fit(melb_data).transform(melb_data)
// val imputed_melb_data = imputer.fit(melb_data2).transform(melb_data2)

imputed_melb_data.select(features_in_focus_imputed.map(name => col(name)):_*).show(5)

As you can see above the features must be for double type or floattype. But the Rooms feature is of type Integer and hence we will need to convert that.

In [None]:
import org.apache.spark.sql.types.DoubleType

val melb_data2 = melb_data
    .withColumn(
        "_Rooms", melb_data("Rooms").cast(DoubleType))
    .drop("Rooms")
    .withColumnRenamed("_Rooms", "Rooms")

In [None]:
import org.apache.spark.ml.feature.Imputer
import org.apache.spark.sql.functions.col

val features_in_focus = Array("Rooms", "Bathroom", "Landsize")
val features_in_focus_imputed = features_in_focus.map(c => s"${c}_imputed")

val imputer = new Imputer()
  .setInputCols(features_in_focus)
  .setOutputCols(features_in_focus_imputed)
  .setStrategy("mean")

val imputed_melb_data = imputer.fit(melb_data2).transform(melb_data2)

imputed_melb_data.select(features_in_focus_imputed.map(name => col(name)):_*).show(5)

### Getting a particular data

ref: https://stackoverflow.com/a/35720457/5417164

In [None]:
val result = house_prices_df
    .filter(line => line(0) == 1)
    .select("MSSubClass").collect()

In [None]:
result(0)(0)

### Reshaping and Pivoting

In [None]:
// create RDD for products
val data = sc.parallelize(Seq(
    ("memories","book","q1",10),
    ("dreams","book","q2",20),
    ("reflections","book","q3",30),
    ("how to build a house","book","q4",40),
    ("wonderful life","music","q1",10),
    ("million miles","music","q2",20),
    ("run away","music","q3",30),
    ("mind and body","music","q4",40)
))

// convert the RDD to DataFrame
val df_products = spark.createDataFrame(data).toDF("product",
                                                   "category",
                                                   "quarter",
                                                   "profit")
df_products.show()

// index column : category
// value column : profit
// pivot column : quarter
// agg function : sum

// apply pivot on DataFrame DataFrame
df_products
    .groupBy("category")
    .pivot("quarter")
    .sum("profit")
    .show()

### Working on time series data

In [None]:
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

import org.apache.spark.sql.types.{StringType, StructType, StructField, IntegerType, FloatType}

val schemaStruct = StructType(
    StructField("date", IntegerType) ::
    StructField("low", FloatType) ::
    StructField("high", FloatType) ::
    StructField("open", FloatType) ::
    StructField("close", FloatType) ::
    StructField("volume", FloatType) :: Nil
)

val crypto_df = spark.read
    .format("csv")                                    // this is a csv file.
    .schema(schemaStruct)
    .load("/home/jovyan/data/crypto_data/LTC-USD.csv")

val w = org.apache.spark.sql.expressions.Window.orderBy("date")  

val FUTURE_PERIOD_PREDICT = 3

val futureDf = crypto_df.withColumn("future", lead("close", FUTURE_PERIOD_PREDICT, 0).over(w))

futureDf.show(5)

In [None]:
import org.apache.spark.sql.functions._

val targetdf = futureDf.withColumn("target", when($"future" > $"close", 1).otherwise(0))

targetdf.show(5)

In [None]:
targetdf.write.format("csv").save("/home/jovyan/data/crypto_data/out.csv")

### Function application, transformations and mapping

By using user defined functions

In [None]:
import org.apache.spark.sql.functions.udf

In [None]:
def morePrecision(price: Integer): Float = price.toFloat

// we use the method name followed by a "_" to indicate we want a reference
// to the method, not call it
val morePrecisionUdf = udf(morePrecision _)

val converted_df = house_prices_df.select($"SalePrice", 
                                          morePrecisionUdf('SalePrice))

In [None]:
converted_df.show(3)

Applying the udf on the same df. ie. creating a new feature by transforming another column.

In [None]:
house_prices_df
    .withColumn("MorePrecisionSalePrice",
                morePrecisionUdf('SalePrice))
    .show(3)

In [None]:
import org.apache.spark.sql.functions.udf

def get_target = udf((future: Float, close: Float) => {
  if(future > close) 1
  else 0
})

val target_df = futureDf.withColumn("target", get_target($"future", $"close"))

In [None]:
target_df.show(5)

Applying some transformation on all the data.

In [None]:
import org.apache.spark.sql.functions.{col, upper}

val df = sc.parallelize(
  Seq(("a", "B", "c"), ("D", "e", "F"))).toDF("x", "y", "z")
df.select(df.columns.map(c => upper(col(c)).alias(c)): _*).show