## Introduction to Spark

In [18]:
//!rm -rf ~/data/flight-retail-data.zip
//!mkdir -p ~/data
//!wget https://sadatashareagsparkml.blob.core.windows.net/hadoop-bangalore/flight-retail-data.zip -O ~/data/flight-retail-data.zip
//!unzip -n ~/data/flight-retail-data.zip -d ~/data/

In [26]:
!ls ~/data/flight-retail-data/flight-data/csv/2015-summary.csv

/home/atingupta2005/data/flight-retail-data/flight-data/csv/2015-summary.csv



In [29]:
// in Scala
val flightData2015 = spark
  .read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("/home/atingupta2005/data/flight-retail-data/flight-data/csv/2015-summary.csv")

flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [30]:
flightData2015.take(3)

res5: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344])


In [31]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#48 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#48 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#133]
      +- FileScan csv [DEST_COUNTRY_NAME#46,ORIGIN_COUNTRY_NAME#47,count#48] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/atingupta2005/data/flight-retail-data/flight-data/csv/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [32]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [33]:
flightData2015.sort("count").take(2)

res8: Array[org.apache.spark.sql.Row] = Array([United States,Singapore,1], [Moldova,United States,1])


In [34]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [35]:
// in Scala
val sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")


sqlWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count(1): bigint]


In [36]:
val dataFrameWay = flightData2015
  .groupBy('DEST_COUNTRY_NAME)
  .count()

sqlWay.explain
dataFrameWay.explain

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46, 5), ENSURE_REQUIREMENTS, [id=#155]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#46] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/atingupta2005/data/flight-retail-data/flight-data/csv/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46, 5), ENSURE_REQUIREMENTS, [id=#168]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#46] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFile

dataFrameWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count: bigint]


In [37]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

res11: Array[org.apache.spark.sql.Row] = Array([370002])


In [38]:
// in Scala
import org.apache.spark.sql.functions.max


import org.apache.spark.sql.functions.max


In [39]:
flightData2015.select(max("count")).show()

+----------+
|max(count)|
+----------+
|    370002|
+----------+



In [40]:
// in Scala
val maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



maxSql: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, destination_total: bigint]


In [41]:
// in Scala
import org.apache.spark.sql.functions.desc

flightData2015
  .groupBy("DEST_COUNTRY_NAME")
  .sum("count")
  .withColumnRenamed("sum(count)", "destination_total")
  .sort(desc("destination_total"))
  .limit(5)
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



import org.apache.spark.sql.functions.desc


In [42]:
// in Scala
flightData2015
  .groupBy("DEST_COUNTRY_NAME")
  .sum("count")
  .withColumnRenamed("sum(count)", "destination_total")
  .sort(desc("destination_total"))
  .limit(5)
  .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#156L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#46,destination_total#156L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[sum(count#48)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#46, 5), ENSURE_REQUIREMENTS, [id=#368]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#46], functions=[partial_sum(count#48)])
            +- FileScan csv [DEST_COUNTRY_NAME#46,count#48] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/atingupta2005/data/flight-retail-data/flight-data/csv/2015-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [45]:
// in Scala
import spark.implicits._
case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String,
                  count: BigInt)
val flightsDF = spark.read
  .parquet("/home/atingupta2005/data/flight-retail-data/flight-data/parquet/2010-summary.parquet/")

// Convert to dataset
val flights = flightsDF.as[Flight]

import spark.implicits._
defined class Flight
flightsDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
flights: org.apache.spark.sql.Dataset[Flight] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [46]:
flightsDF

res17: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [47]:
flights.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [34]:
flights
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))

<console>: 35: error: value === is not a member of String

In [32]:
flights.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [48]:
// in Scala
val staticDataFrame = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("../data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(InvoiceNo,StringType,true), StructField(StockCode,StringType,true), StructField(Description,StringType,true), StructField(Quantity,IntegerType,true), StructField(InvoiceDate,StringType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,DoubleType,true), StructField(Country,StringType,true))


In [52]:
spark.sql("""
select * from retail_data where Country = 'United Kingdom'
""").show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [53]:
// in Scala
import org.apache.spark.sql.functions.{window, column, desc, col}
staticDataFrame
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|{2011-12-05 00:00...|316.78000000000003|
|   18180.0|{2011-12-05 00:00...|            310.73|
|   15358.0|{2011-12-05 00:00...| 830.0600000000003|
|   15392.0|{2011-12-05 00:00...|304.40999999999997|
|   15290.0|{2011-12-05 00:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



import org.apache.spark.sql.functions.{window, column, desc, col}


In [54]:
spark.conf.set("spark.sql.shuffle.partitions", "5")