In [1]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [2]:
val data_path = "/home/jovyan/work/guide/data/"

data_path = /home/jovyan/work/guide/data/


/home/jovyan/work/guide/data/

In [7]:
// Datasets

// PATH  /home/jovyan/work/guide/

case class Flight(DEST_COUNTRY_NAME: String,
                 ORIGIN_COUNTRY_NAME: String,
                 count: BigInt)

val flightsDF = spark.read
                .parquet("/home/jovyan/work/guide/data/flight-data/parquet/2010-summary.parquet")



defined class Flight
flightsDF = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


lastException: Throwable = null


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

In [8]:
flightsDF.take(5)

Array([United States,Romania,1], [United States,Ireland,264], [United States,India,69], [Egypt,United States,24], [Equatorial Guinea,United States,1])

In [9]:
val flights = flightsDF.as[Flight]

flights = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

In [10]:
flights
    .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
    .map(flight_row => flight_row)
    .take(5)

Array(Flight(United States,Romania,1), Flight(United States,Ireland,264), Flight(United States,India,69), Flight(Egypt,United States,24), Flight(Equatorial Guinea,United States,1))

In [13]:
flights
    .take(5)
    .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
    .map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))

Array(Flight(United States,Romania,6), Flight(United States,Ireland,269), Flight(United States,India,74), Flight(Egypt,United States,29), Flight(Equatorial Guinea,United States,6))

In [14]:
flights
    .take(5)
    .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
    .map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count))

Array(Flight(United States,Romania,1), Flight(United States,Ireland,264), Flight(United States,India,69), Flight(Egypt,United States,24), Flight(Equatorial Guinea,United States,1))

## Structured streaming

In [3]:
val staticDataFrame = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(data_path + "retail-data/by-day/*.csv")

staticDataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]


[InvoiceNo: string, StockCode: string ... 6 more fields]

In [4]:
staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema

staticSchema = StructType(StructField(InvoiceNo,StringType,true), StructField(StockCode,StringType,true), StructField(Description,StringType,true), StructField(Quantity,IntegerType,true), StructField(InvoiceDate,TimestampType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,DoubleType,true), StructField(Country,StringType,true))


StructType(StructField(InvoiceNo,StringType,true), StructField(StockCode,StringType,true), StructField(Description,StringType,true), StructField(Quantity,IntegerType,true), StructField(InvoiceDate,TimestampType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,DoubleType,true), StructField(Country,StringType,true))

In [21]:
import org.apache.spark.sql.functions.{window, column, desc, col}
staticDataFrame
    .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))
    .sum("total_cost")
    .show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 00:00...|            -37.6|
|   14126.0|[2011-11-29 00:00...|643.6300000000001|
|   13500.0|[2011-11-16 00:00...|497.9700000000001|
|   17160.0|[2011-11-08 00:00...|516.8499999999999|
|   15608.0|[2011-11-11 00:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [26]:
spark.sql("""
SELECT CustomerId, (UnitPrice * Quantity) as total_cost, InvoiceDate, 
from retail_data
sum(total_cost)
LIMIT 5
""")

lastException = null


Name: org.apache.spark.sql.catalyst.parser.ParseException
Message: 
mismatched input 'sum' expecting <EOF>(line 4, pos 0)

== SQL ==

SELECT CustomerId, (UnitPrice * Quantity) as total_cost, InvoiceDate, 
from retail_data
sum(total_cost)
^^^
LIMIT 5

StackTrace: mismatched input 'sum' expecting <EOF>(line 4, pos 0)
== SQL ==
SELECT CustomerId, (UnitPrice * Quantity) as total_cost, InvoiceDate,
from retail_data
sum(total_cost)
^^^
LIMIT 5
  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)

In [29]:
val streamingDataFrame = spark.readStream
    .schema(staticSchema)
    .option("maxFilesPerTrigger", 1)
    .format("csv")
    .option("header", "true")
    .load(data_path + "/retail-data/by-day/*.csv")

streamingDataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]


[InvoiceNo: string, StockCode: string ... 6 more fields]

In [30]:
// check wheter the DataFrame is streaming
streamingDataFrame.isStreaming

true

In [32]:
// Create Window operation

val purchaseByCustomerPerHour = streamingDataFrame
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")
    .groupBy(
        $"CustomerId", window($"InvoiceDate", "1 day"))
    .sum("total_cost")

purchaseByCustomerPerHour = [CustomerId: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]


[CustomerId: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]

In [33]:
// Push DataFrame-stream to a in-memor table

purchaseByCustomerPerHour
    .writeStream
    .format("memory") // memory = store in-memory table
    .queryName("customer_purchases") // name of the in-memory table
    .outputMode("complete") // complete = all the counts should be in the table
    .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3bd64718

In [34]:
// Once we've started the stream, we can run queries against it

spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""")
.show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|      null|[2011-11-16 00:00...|17518.40999999992|
|   12415.0|[2011-03-03 00:00...|         16558.14|
|   15769.0|[2011-03-17 00:00...|          10065.0|
|   17450.0|[2011-11-03 00:00...|          9069.82|
|      null|[2011-03-17 00:00...|7876.000000000018|
+----------+--------------------+-----------------+
only showing top 5 rows



In [36]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""")
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|[2011-11-14 00:00...|          55316.08|
|      null|[2011-11-16 00:00...| 17518.40999999992|
|   12415.0|[2011-03-03 00:00...|          16558.14|
|   14646.0|[2011-08-09 00:00...|12498.709999999997|
|   15769.0|[2011-03-17 00:00...|           10065.0|
+----------+--------------------+------------------+
only showing top 5 rows



In [37]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""")
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|[2011-11-14 00:00...|          55316.08|
|   17949.0|[2011-06-30 00:00...|18854.780000000002|
|      null|[2011-11-16 00:00...| 17518.40999999992|
|   12415.0|[2011-03-03 00:00...|          16558.14|
|   14646.0|[2011-08-09 00:00...|12498.709999999997|
+----------+--------------------+------------------+
only showing top 5 rows



In [38]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
""")
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|[2011-11-14 00:00...|          55316.08|
|   15749.0|[2011-01-11 00:00...|           22998.4|
|   17949.0|[2011-06-30 00:00...|18854.780000000002|
|   17450.0|[2011-01-11 00:00...|18620.199999999993|
|      null|[2011-11-16 00:00...| 17518.40999999992|
+----------+--------------------+------------------+
only showing top 5 rows



In [40]:
// write to the console // tty

purchaseByCustomerPerHour
    .writeStream
    .format("console")
    .queryName("customer_purchases_2")
    .outputMode("complete")
    .start()

lastException: Throwable = null


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5a6f63a5

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14239.0|[2011-03-03 00:00...|             -56.1|
|   17700.0|[2011-03-03 00:00...| 602.6099999999999|
|   15932.0|[2011-03-03 00:00...|             -7.65|
|   16191.0|[2011-03-03 00:00...|             -1.65|
|   17646.0|[2011-03-03 00:00...|            345.85|
|   18041.0|[2011-03-03 00:00...|            148.49|
|   18102.0|[2011-03-03 00:00...|            1396.0|
|   13630.0|[2011-03-03 00:00...|             -14.4|
|   17652.0|[2011-03-03 00:00...|             222.3|
|   17567.0|[2011-03-03 00:00...|            535.38|
|   15596.0|[2011-03-03 00:00...|            303.03|
|   13476.0|[2011-03-03 00:00...| 727.5999999999999|
|   14524.0|[2011-03-03 00:00...|            210.05|
|   12500.0|[2011-03-03 00:00...|            249.84|
| 

-------------------------------------------
Batch: 6
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17218.0|[2010-12-19 00:00...|              99.5|
|   15433.0|[2011-03-17 00:00...| 372.6399999999999|
|   15932.0|[2011-03-03 00:00...|             -7.65|
|   17368.0|[2011-01-06 00:00...| 563.1500000000001|
|   12514.0|[2011-03-17 00:00...|1017.6800000000002|
|   15602.0|[2011-11-17 00:00...|              46.8|
|   12753.0|[2011-01-06 00:00...|            -109.8|
|   16839.0|[2011-03-17 00:00...|449.87000000000006|
|   15399.0|[2011-11-17 00:00...|243.95999999999998|
|   17652.0|[2011-03-03 00:00...|             222.3|
|   15379.0|[2011-01-06 00:00...| 660.1500000000002|
|   12847.0|[2011-11-17 00:00...| 871.5399999999998|
|   14312.0|[2011-01-06 00:00...|            309.55|
|   14034.0|[2011-11-17 00:00...|177.90999999999994|
| 

-------------------------------------------
Batch: 12
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15433.0|[2011-03-17 00:00...| 372.6399999999999|
|   15447.0|[2011-01-13 00:00...|            155.17|
|   15932.0|[2011-03-03 00:00...|             -7.65|
|   17368.0|[2011-01-06 00:00...| 563.1500000000001|
|   13140.0|[2011-08-23 00:00...|114.06000000000002|
|   12514.0|[2011-03-17 00:00...|1017.6800000000002|
|   12753.0|[2011-01-06 00:00...|            -109.8|
|   15804.0|[2011-08-23 00:00...|432.47999999999996|
|   16839.0|[2011-03-17 00:00...|449.87000000000006|
|   15399.0|[2011-11-17 00:00...|243.95999999999998|
|   17652.0|[2011-03-03 00:00...|             222.3|
|   14312.0|[2011-01-06 00:00...|            309.55|
|   14034.0|[2011-11-17 00:00...|177.90999999999994|
|   13014.0|[2011-11-03 00:00...|347.34999999999997|
|

-------------------------------------------
Batch: 18
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   15805.0|[2011-11-16 00:00...|              370.4|
|   15932.0|[2011-03-03 00:00...|              -7.65|
|   17368.0|[2011-01-06 00:00...|  563.1500000000001|
|   17858.0|[2011-03-16 00:00...| 432.20000000000005|
|   15694.0|[2011-03-16 00:00...|             584.76|
|   16839.0|[2011-03-17 00:00...| 449.87000000000006|
|   15399.0|[2011-11-17 00:00...| 243.95999999999998|
|   17652.0|[2011-03-03 00:00...|              222.3|
|   14060.0|[2011-11-17 00:00...|             412.56|
|   17460.0|[2011-03-16 00:00...|             138.88|
|   16525.0|[2011-11-02 00:00...|             863.04|
|   18102.0|[2011-03-03 00:00...|             1396.0|
|   13500.0|[2011-11-16 00:00...|  497.9700000000001|
|   15068.0|[2011-03-28 00:00...| 239.

Machine Learning and Advanced Analytics

In [5]:
// Verify dataframe's schema
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [6]:
import org.apache.spark.sql.functions.date_format

In [26]:
// transform data
val preppedDataFrame = staticDataFrame
    .na.fill(0) // fill null values with 0's
    .withColumn("day_of_week", date_format($"InvoiceDate", "EEEE")) // add column
    .coalesce(5) // reduce the number of partitions in the DataFrame. 

preppedDataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]


lastException: Throwable = null


[InvoiceNo: string, StockCode: string ... 7 more fields]

In [27]:
// split data train/test sets
val trainDataFrame = preppedDataFrame
    .where("InvoiceDate < '2011-07-01'")


val testDataFrame = preppedDataFrame
    .where("InvoiceDate >= '2011-07-01'")

trainDataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]
testDataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]


[InvoiceNo: string, StockCode: string ... 7 more fields]

In [11]:
trainDataFrame.count()

245903

In [12]:
testDataFrame.count()

296006

In [28]:
import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
    .setInputCol("day_of_week")
    .setOutputCol("day_of_week_index")

indexer = strIdx_65af8c86d335


strIdx_65af8c86d335

In [29]:
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
    .setInputCol("day_of_week_index")
    .setOutputCol("day_of_week_encoded")

encoder = oneHot_9c3f8ddb0711




oneHot_9c3f8ddb0711

In [30]:
// create vector to assemble model input

import org.apache.spark.ml.feature.VectorAssembler

val vectorAssembler = new VectorAssembler()
    .setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
    .setOutputCol("features")

vectorAssembler = vecAssembler_283cd0df8f0e


vecAssembler_283cd0df8f0e

In [31]:
// Setup pipeline to that any future data we need to transform can go through the same process

import org.apache.spark.ml.Pipeline

In [32]:
val transformationPipeline = new Pipeline()
    .setStages(Array(indexer, encoder, vectorAssembler))

transformationPipeline = pipeline_65525314e0b9


pipeline_65525314e0b9

In [33]:
val fittedPipeline = transformationPipeline.fit(trainDataFrame)

fittedPipeline = pipeline_65525314e0b9


pipeline_65525314e0b9

In [39]:
val transformedTraining = fittedPipeline.transform(trainDataFrame)

transformedTraining = [InvoiceNo: string, StockCode: string ... 10 more fields]


[InvoiceNo: string, StockCode: string ... 10 more fields]

In [40]:
// set cache

transformedTraining.cache()

[InvoiceNo: string, StockCode: string ... 10 more fields]

In [42]:
// import and instantiate model

import org.apache.spark.ml.clustering.KMeans



In [43]:
val kmeans = new KMeans()
    .setK(20)
    .setSeed(42)

kmeans = kmeans_d2e1de5cad7d


kmeans_d2e1de5cad7d

In [44]:
val kmModel = kmeans.fit(transformedTraining)

kmModel = kmeans_d2e1de5cad7d


kmeans_d2e1de5cad7d

In [45]:
kmModel.computeCost(transformedTraining)



7.677936986812846E7

In [46]:
val transformedTest = fittedPipeline.transform(testDataFrame)

transformedTest = [InvoiceNo: string, StockCode: string ... 10 more fields]


[InvoiceNo: string, StockCode: string ... 10 more fields]

In [48]:
kmModel.computeCost(transformedTest)



5.3571962393569505E8

## Lower-level APIs


In [51]:
spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF()

[value: int]