# Spark's Toolset

## Datasets: Type-Safe Structured APIs

In [0]:
import spark.implicits._

// Creating Dataset class
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
    
    val flightsDF = spark
        .read
        .parquet("/datasets/flight-data/parquet/2010-summary.parquet/")
    
    val flights = flightsDF.as[Flight]


## Writing business logic with type-safe functions and DataFrame SQL

In [0]:
// Selecting data - Option 1
display(
  flights
    .limit(5)
    .filter("ORIGIN_COUNTRY_NAME != 'Canada'")
)

In [0]:
// Selecting data - Option 2
val updatedFlights = flights
    .limit(5)
    .filter("ORIGIN_COUNTRY_NAME != 'Canada'")
    .withColumn("count", $"count" + 5)

display(updatedFlights)

## Structured Streaming

In [0]:
// Load the dataset
val staticDataFrame = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/datasets/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

In [0]:
import org.apache.spark.sql.functions.{window, column, desc, col}

// Working with window function
staticDataFrame
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))
    .sum("total_cost")
    .show(5)

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

// Working the readStream
val streamingDataFrame = spark.readStream
    .schema(staticSchema)
    .option("maxFilesPerTrigger", 1)
    .format("csv")
    .option("header", "true")
    .load("/datasets/retail-data/by-day/*.csv")

streamingDataFrame.isStreaming

In [0]:
// Perform a summation in the process
val purchaseByCustomerPerHour = streamingDataFrame
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")
    .groupBy(
        $"CustomerId", window($"InvoiceDate", "1 day"))
    .sum("total_cost")

In [0]:
// Start the streaming
purchaseByCustomerPerHour.writeStream
    .format("memory") // memory = store in-memory table
    .queryName("customer_purchases") // the name of the in-memory table
    .outputMode("complete") // complete = all the counts should be in the table
    .start()

In [0]:
// Stream started so run query
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""")
    .show(5)

## Machine Learning with K-Means

In [0]:
// Analyzing the Dataframe structure
staticDataFrame.printSchema()

In [0]:
import org.apache.spark.sql.functions.date_format

// Transforming data into some numerical representation
val preppedDataFrame = staticDataFrame
    .na.fill(0)
    .withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
    .coalesce(5)

In [0]:
// Split the data into training and test sets
val trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-02-01'")

val testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-02-01'")

trainDataFrame.count()
testDataFrame.count()

In [0]:
import org.apache.spark.ml.feature.StringIndexer

// Transforming with StringIndexer
val indexer = new StringIndexer()
    .setInputCol("day_of_week")
    .setOutputCol("day_of_week_index")

In [0]:
import org.apache.spark.ml.feature.OneHotEncoder

// Transforming with OneHotEncoder
val encoder = new OneHotEncoder()
    .setInputCol("day_of_week_index")
    .setOutputCol("day_of_week_encoded")

In [0]:
import org.apache.spark.ml.feature.VectorAssembler

// Transforming with VectorAssembler
val vectorAssembler = new VectorAssembler()
    .setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
    .setOutputCol("features")

In [0]:
import org.apache.spark.ml.Pipeline

// Creating a pipeline
val transformatPipeline = new Pipeline().setStages(Array(indexer, encoder, vectorAssembler))

val fittedPipeline = transformatPipeline.fit(trainDataFrame)

val transformedTraining = fittedPipeline.transform(trainDataFrame)

transformedTraining.cache()

In [0]:
import org.apache.spark.ml.clustering.KMeans

// Training the model
val kmeans = KMeans().setK(20)setSeed(1L)

val kmModel = kmeans.fit(transformedTraining)

kmModel.computeCost(transformedTraining)

In [0]:
// Testing the model
val transformedTest = fittedPipeline.transform(testDataFrame)

kmModel.computeCost(transformedTest)

# Fim