### grp

# Spark: The Definitive Guide

## PART 2: Structured APIs - DataFrames, SQL, and Datasets

## dataPaths

In [1]:
flightDataJson2015 = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/json/2015-summary.json'
flightDataJson = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/json/*-summary.json'
retailData20101201 = '/Users/grp/sparkTheDefinitiveGuide/data/retail-data/by-day/2010-12-01.csv'
retailDataAll = '/Users/grp/sparkTheDefinitiveGuide/data/retail-data/all/*.csv'
flightDataCSV2010 = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/csv/2010-summary.csv'
flightDataJson2010 = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/json/2010-summary.json'
flightDataParquet2010 = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/parquet/2010-summary.parquet'
flightDataORC2010 = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/orc/2010-summary.orc'
sqliteJDBC = '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/jdbc/my-sqlite.db'

In [2]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

## _Chapter #9 - Data Sources_

-  **6 "Core" Data Sources**:
    -  CSV (parameters listed in table 9-3):
        -  _does not support complex types (ex: array, nested data)_
    -  JSON (parameters listed in table 9-4):
        -  multiLine (allows for reading in non-line-delimited JSON files)
    -  Parquet (parameters listed in table 9-4):
        -  columnar compression for saving storage space and optimized select querying
        -  default file format for Apache Spark
        -  supports complex types (ex: array)
        -  enforces own schema when storing data as schema is built into the file itself (so no inference needed)
    -  ORC (no parameter options):
        -  ORC is further optimized for Hive however works very well with Spark
        -  optimized for large streaming reads and finds rows quickly
        -  similar to parquet
    -  JDBC/ODBC connections (parameters listed in table 9-6):
        - JDBC read/write:
            -  need JDBC driver for particular database on spark classpath
            -  provide proper JAR for driver
    -  TXT:
        -  each line in file is a record in DF
        -  writing to TXT requires source to have ONLY 1 string column or else write will fail   
    <br>
-  **Community-Created Data Sources**:
    -  Cassandra / HBase / MongoDB / Redshift
    -  XML / Avro
    
#### Structure for READING data: DataFrameReader.format(...).option("key", "value").schema(...).load(...)

-  **Reading Data** (spark.read):
    -  format
    -  schema
    -  read mode:
        -  _permissive_ (sets all fields to null when encountering a corrupted reocrd / places corrupted records in column (_corrupt_record)
        -  _dropMalformed_ (drops rows containing malformed records)
        -  _failFast_ (fails immediately when encountering malformed records)
    -  options
    
#### Structure for WRITING data: DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save(...)

-  **Writing Data** (dataFrame.write):
    -  format
    -  options
    -  save mode:
        -  _append_ (appends output files to the list of files that already exist at that location path)
        -  _overwrite_ (completely overwrites files at location path)
        -  _errorIfExists_ (shows an error and fails to write if files already exist at that specified location path)
        -  _ignore_ (if files exist at the location path do nothing with current DF)

### Advanced I/O Concepts:
-  Spark Developers have the ability to control the parallelism of files written by controlling the partitions prior to writing:
    -  bucketing
    -  partitioning   

### Splittable File Types and Compression:
-  parquet w/ GZIP compression is recommended

### Reading Data in Parallel:
-  multiple executors cannot read from the same file at the same time however can read different files at the same time
-  each file in a folder becomes a partition in a DF and will be read by available executors in parallel

### Writing Data in Parallel:
-  _number of files written (output) is dependent on the number of partitions the DF object has when you write out the data_
-  by default, 1 file is written / partition of the data

#### File Organization Approach (methods for controlling the data that is specifically written to each output file):
#### 1. Partitioning "Partition By":
-  encodes a column as a folder
-  output files contain data based on partitionBy predicate

#### 2. Bucketing:
-  groups data by bucket ID
-  helps with avoiding shuffles when joining or aggregating
-  supported only for Spark managed tables

### Writing Complex Types:
-  the best file format often depends on the type of data being read and processed

### Managing File Size:
-  try to avoid "lots of small files" aka "small file problem" because it requires a lot of metadata to manage
-  Spark does not work well with many small files or many large files
-  "maxRecordsPerFile" helps will controlling the file size / # of records written to each file (ex: df.write.option("maxRecordsPerFile", 5000))
-  always be aware of the # of partitions at write time

### _Chapter #9 Exercises (Data Sources)_

### _CSV Read Example_

In [228]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [229]:
myManualSchema = StructType([\
    StructField("DEST_COUNTRY_NAME", StringType(), True),\
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),\
    StructField("count", LongType(), False)])

In [230]:
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.schema(myManualSchema)\
.load(flightDataCSV2010)

In [231]:
csvFile.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _CSV Write Example_

In [232]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/csvWrite.csv")

In [233]:
# reflects # of files outputed to target save directory
csvFile.rdd.getNumPartitions()

1

In [234]:
# 1 row per line tab delimited
!head /Users/grp/sparkTheDefinitiveGuide/tmp/csvWrite.csv/*.csv

United States	Romania	1
United States	Ireland	264
United States	India	69
Egypt	United States	24
Equatorial Guinea	United States	1
United States	Singapore	25
United States	Grenada	54
Costa Rica	United States	477
Senegal	United States	29
United States	Marshall Islands	44


### _JSON Read Example_

In [235]:
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load(flightDataJson2010).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _JSON Write Example_

In [236]:
csvFile.write.format("json").mode("overwrite")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/jsonWrite.json")

In [237]:
# 1 JSON object per line
!head /Users/grp/sparkTheDefinitiveGuide/tmp/jsonWrite.json/*.json

{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Romania","count":1}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Ireland","count":264}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"India","count":69}
{"DEST_COUNTRY_NAME":"Egypt","ORIGIN_COUNTRY_NAME":"United States","count":24}
{"DEST_COUNTRY_NAME":"Equatorial Guinea","ORIGIN_COUNTRY_NAME":"United States","count":1}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Singapore","count":25}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Grenada","count":54}
{"DEST_COUNTRY_NAME":"Costa Rica","ORIGIN_COUNTRY_NAME":"United States","count":477}
{"DEST_COUNTRY_NAME":"Senegal","ORIGIN_COUNTRY_NAME":"United States","count":29}
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Marshall Islands","count":44}


### _Parquet Read Example_

In [238]:
spark.read.format("parquet")\
.load(flightDataParquet2010).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _Parquet Write Example_

In [239]:
csvFile.write.format("parquet").mode("overwrite")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/parquetWrite.parquet")

In [240]:
# 1 parquet file
!head /Users/grp/sparkTheDefinitiveGuide/tmp/parquetWrite.parquet/*.parquet

PAR1��L�  �h   United States   Egypt	�quatorial Guinea
   Costa Rica   Senegal   GuyanaAMalt�@Bolivia   Anguilla   Turks and Caicos Islands    Saint Vincent $0the Grenadine�ItalyUPakistan�HIceland   MarshallX�$Luxembourg9 Honduras�he Baham	(El Salvador�Samo�Kazakhn5Switzerr !�$Maarten	  `g Kongarinidad�Tobago!0Lat-Slovak)(Suriname"Mexico�Ecu��@Colombia   Norwa%
Thai�}Venezuel#Panam%�Morocco�Antigua�Barbud� Azerbaija�New ZeaV�LiberiJHungary�Sweden
Israel�EthiopiS$Martinique!�)� Barthelem��adosdGermany!�Kyrgyz!oIr%�^Malays) CypruE8Qatar   Fiji QHKittIjNevis�Taiwan!�aitiKuwait
CanadaHederay, of Micrones)pJamaica   Dominican Republic`JapI�Fin!FAruba`renchaX ai5XIndia   British VirginQ��Brazil!I>Poly� u�Arab Emi� sA SingaporeA�NetheE�%7Chin%�Denmark!BPeru8DArgentina   Cayma� !a(outh Africa�Spain�c  Aille%�Bermud�KiribatiHaudi�

### _ORC Read Example_

In [241]:
spark.read.format("orc")\
.load(flightDataORC2010).show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
+-----------------+-------------------+-----+
only showing top 3 rows



### _ORC Write Example_

In [242]:
csvFile.write.format("orc").mode("overwrite")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/orcWrite.orc")

In [243]:
# 1 orc file
!head /Users/grp/sparkTheDefinitiveGuide/tmp/orcWrite.orc/*.orc

ORC  
�P S  
'
    �"
AfghanistanVietnam�,P S  
'
    �"
AfghanistanVietnam�,P 5  

   �


### _SQLite SQL Database Example_

In [244]:
'''
run via shell: 
pyspark \
--master local[8] \
--driver-class-path /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar \
--jars /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar
'''

'\nrun via shell: \npyspark --master local[8] --driver-class-path /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar --jars /Users/grp/sparkTheDefinitiveGuide/sqlite-jdbc-3.8.6.jar\n'

In [245]:
driver = "org.sqlite.JDBC"
path = sqliteJDBC
url = "jdbc:sqlite:" + path
tablename = "flight_info"

In [246]:
dbDataFrame = spark.read\
.format("jdbc").option("url", url).option("dbtable", tablename).option("driver",  driver).load()

In [247]:
dbDataFrame.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: decimal(20,0) (nullable = true)



In [248]:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().show(5)

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|   Equatorial Guinea|
|             Bolivia|
|Turks and Caicos ...|
|            Pakistan|
|    Marshall Islands|
+--------------------+
only showing top 5 rows



### _Additional Options Example_

In [249]:
'''
pgDF = spark.read.format("jdbc")\
  .option("driver", "org.postgresql.Driver")\
  .option("url", "jdbc:postgresql://database_server")\
  .option("dbtable", "schema.tablename")\
  .option("user", "username").option("password", "my-secret-password").load()
'''

'\npgDF = spark.read.format("jdbc")  .option("driver", "org.postgresql.Driver")  .option("url", "jdbc:postgresql://database_server")  .option("dbtable", "schema.tablename")  .option("user", "username").option("password", "my-secret-password").load()\n'

### _Query Pushdown  Example_

In [250]:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#8455], functions=[])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#8455, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#8455], functions=[])
      +- *(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#8455] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


### _Query Pushdown Filter To DB Example_

In [251]:
dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain()

== Physical Plan ==
*(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#8455,ORIGIN_COUNTRY_NAME#8456,count#8457] PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>


### _Specify SQL Query (Query Result of Query) Example_

In [252]:
pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
dbDataFrame = spark.read\
.format("jdbc").option("url", url).option("dbtable", pushdownQuery).option("driver",  driver).load()

In [253]:
dbDataFrame.explain()

== Physical Plan ==
*(1) Scan JDBCRelation((SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#8471] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


### _Parallel Reads From DB Example_

In [254]:
# numPartitions specifies max num of partitions to how much spark is reading and writing in parallel
# partitions = level of parallelism
dbDataFrame = spark.read.format("jdbc")\
.option("url", url).option("dbtable", tablename).option("driver",  driver).option("numPartitions", 10).load()

### _Parallel Predicate Pushdown To DB Example_

In [255]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
  "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show()
spark.read.jdbc(url,tablename,predicates=predicates,properties=props)\
  .rdd.getNumPartitions() # 2

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Sweden|      United States|   65|
|    United States|             Sweden|   73|
|         Anguilla|      United States|   21|
|    United States|           Anguilla|   20|
+-----------------+-------------------+-----+



2

### _Parallel Predicate Pushdown To DB w/ Duplicate Rows Example_

In [256]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
  "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).count()

510

In [257]:
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).rdd.getNumPartitions() #2

2

### _Sliding Window Partition Example_

In [258]:
# min and max partition
colName = "count"
lowerBound = 0
upperBound = 348113
numPartitions = 10

spark.read.jdbc(url, tablename, column=colName, properties=props,
                lowerBound=lowerBound, upperBound=upperBound,
                numPartitions=numPartitions).count() # 255

255

In [259]:
spark.read.jdbc(url, tablename, column=colName, properties=props,
                lowerBound=lowerBound, upperBound=upperBound,
                numPartitions=numPartitions).rdd.getNumPartitions() #10

10

### _Writing To SQL DB Example_

In [260]:
newPath = "jdbc:sqlite://Users/grp/sparkTheDefinitiveGuide/tmp/sqlWrite.db"
csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)

In [261]:
# 1 db file
!head /Users/grp/sparkTheDefinitiveGuide/tmp/sqlWrite.db

SQLite format 3   @     &                                                             & -�   x x                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          �##�Qtableflight_infoflight_infoCREATE TABLE flight_info ("DEST_COUNTRY_NAME" TEXT , "ORIGIN_COUNTRY_NAME"

### _Reading Results Example_

In [262]:
spark.read.jdbc(newPath, tablename, properties=props).count()

255

### _Append Write/Re-read Example_

In [263]:
csvFile.write.jdbc(newPath, tablename, mode="append", properties=props)
spark.read.jdbc(newPath, tablename, properties=props).count()

510

### _TXT Read Example_

In [264]:
spark.read.text(flightDataCSV2010).selectExpr("split(value, ',') as rows").show(3, False)

+-----------------------------------------------+
|rows                                           |
+-----------------------------------------------+
|[DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count]|
|[United States, Romania, 1]                    |
|[United States, Ireland, 264]                  |
+-----------------------------------------------+
only showing top 3 rows



### _TXT Write Example_

In [265]:
csvFile.select("DEST_COUNTRY_NAME").write.mode("overwrite")\
.text("/Users/grp/sparkTheDefinitiveGuide/tmp/txtWrite.txt")

In [266]:
# 1 txt file
!head /Users/grp/sparkTheDefinitiveGuide/tmp/txtWrite.txt/*.txt

United States
United States
United States
Egypt
Equatorial Guinea
United States
United States
Costa Rica
Senegal
United States


### _TXT Write Partition By Example_

In [267]:
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count").write.mode("overwrite").partitionBy("count")\
.text("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt")

In [268]:
import os
for i in os.listdir("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt"): print(i)

count=69
._SUCCESS.crc
count=29
count=264
count=44
count=54
count=477
_SUCCESS
count=1
count=24
count=25


In [269]:
# files split by count column
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=1/*.txt
print("\n")
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=24/*.txt
print("\n")
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=25/*.txt
print("\n")
!head /Users/grp/sparkTheDefinitiveGuide/tmp/partitionWrite.txt/count=264/*.txt

United States
Equatorial Guinea


Egypt


United States


United States


### _Writing Data in Parallel Example_

In [270]:
csvFile.repartition(5).write.mode("overwrite").format("csv")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/repartitionWrite.csv")

In [271]:
# files split into 5 partitions
for i in os.listdir("/Users/grp/sparkTheDefinitiveGuide/tmp/repartitionWrite.csv/"): print(i)

part-00004-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv
._SUCCESS.crc
.part-00004-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv.crc
.part-00001-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv.crc
part-00001-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv
.part-00000-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv.crc
part-00000-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv
.part-00002-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv.crc
.part-00003-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv.crc
part-00003-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv
_SUCCESS
part-00002-85c4b81c-1971-41b7-ada5-932b8a53fc45-c000.csv


In [272]:
!head /Users/grp/sparkTheDefinitiveGuide/tmp/repartitionWrite.csv/part-00001*.csv

Barbados,United States,130
United States,Fiji,51
United States,Senegal,46
New Zealand,United States,86
Kiribati,United States,17
Afghanistan,United States,11
Latvia,United States,12
United States,Luxembourg,90
United States,Angola,18
United States,Cyprus,1


### _Partitioning (partitionBy) Example_

In [273]:
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
.save("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionByWrite.parquet/")

In [274]:
# files split by predicate
for i in os.listdir("/Users/grp/sparkTheDefinitiveGuide/tmp/partitionByWrite.parquet/"): print(i)

DEST_COUNTRY_NAME=United States
DEST_COUNTRY_NAME=Costa Rica
._SUCCESS.crc
DEST_COUNTRY_NAME=Senegal
DEST_COUNTRY_NAME=Equatorial Guinea
_SUCCESS
DEST_COUNTRY_NAME=Egypt


### _Bucketing Example_

In [275]:
numberBuckets = 10
columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")\
.bucketBy(numberBuckets, columnToBucketBy).mode("overwrite").saveAsTable("bucketedFiles")

In [276]:
# stored in spark-warehouse local dir or /user/hive/warehouse on a cluster
for i in os.listdir("/Users/grp/sparkNotebooks/spark-warehouse/bucketedfiles/"): print(i)

.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00007.c000.snappy.parquet.crc
.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00008.c000.snappy.parquet.crc
.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00003.c000.snappy.parquet.crc
._SUCCESS.crc
.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00000.c000.snappy.parquet.crc
.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00004.c000.snappy.parquet.crc
part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00006.c000.snappy.parquet
part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00007.c000.snappy.parquet
part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00004.c000.snappy.parquet
.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00002.c000.snappy.parquet.crc
part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00005.c000.snappy.parquet
part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00003.c000.snappy.parquet
part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00002.c000.snappy.parquet
.part-00000-eda721fc-d305-4db5-941e-c76906a0a539_00009.c000.snappy.p

## _Chapter #10 - Spark SQL_

-  ability to run SQL queries against views or tables in databases
-  SQL or "Structured Query Language" is a language expressing relational operations over data (manipulations, definitions, controls)
-  supports both ANSI-SQL and HiveQL queries
-  intended to operate as an OLAP DB not a OLTP DB for low-latency queries
-  a Spark SQL CLI is available via ./bin/spark-sql
-  ad hoc Spark SQL via SparkSession as spark.sql(...)
-  includes an JDBC/ODBC server started via ./sbin/start-thriftserver.sh

### Spark's Relationship to Hive:
-  connects to Hive metastores (where Hive maintains table information)  
-  configure Hive with Spark by placing hive-site.xml, core-site.xml, and hdfs-site.xml files in conf/

### Catalog:
-  stores metadata about data stored in tables, databases, functions, views
-  available in the _org.apache.spark.sql.catalog.Catalog_ package

### Tables:
-  structure of data
-  hold information - data and metadata (data about the tables)
-  tables are defined in database and dataframes are defined within programming language code
-  tables will be assigned a database (_default_ is default database) ... "show tables in DATABASENAME"
-  Types:
    -  Managed:
        -  saveAsTable on DF
        -  writes to default Hive Warehouse location (/user/hive/warehouse)
    -  Unmanaged:
        - define a table structure from files on disk

### Dropping Tables:
-  dropping a managed table the data and the table definition will be removed
-  "DROP TABLE IF EXISTS" will delete the table and the data if the table exists
-  dropping a unmanaged table the underlying table will be removed however the data will still exist in linked directory

### Metadata:
-  can describe the table's metadata
-  REFRESH TABLE refreshes all cached files linked to table
-  REPAIR TABLE refreshes partitions / collects new partitions maintained in the Catalog

### Views:
-  set of transformations (saved query) on top of an existing table
-  equivalent to creating a new DF from an existing DF
-  when dropping a VIEW the underlying data is not removed just the view definition itself
-  Types:
    -  global [viewable across entire Spark Application however removed at end of session] (GLOBAL TEMPORARY VIEW)
    -  set to a database
    -  per session [only available during current sesion and not registered to a database] (TEMPORARY VIEW)
    
### Databases:
-  organize tables

### Spark SQL Complex Types:
-  structs [provide a way of creating or querying nested data in Spark]
-  lists [array]
-  maps

### Functions:
-  system functions as well as user defined functions

### Subqueries:
-  queries within other queries
-  Kinds:
    -  correlated subqueries [use logic from outer scope of query in inner query to suplement information in subquery]
    -  uncorrelated subqueries [includes no information from the outer scope of the query]
    -  predicate subqueries [filtering based on values]

### _Chapter #10 Exercises (SQL)_

In [277]:
spark.sql("drop database if exists sparkSQL CASCADE")
spark.sql("create database sparkSQL")
spark.sql("use sparkSQL")

DataFrame[]

In [278]:
spark.sql("drop table if exists flights_from_select")
spark.sql("drop table if exists flights")
spark.sql("drop table if exists flights_csv")
spark.sql("drop table if exists hive_flights")
spark.sql("drop table if exists partitioned_flights")
spark.sql("drop table if exists nested_data")
spark.sql("drop table if exists just_usa_view")
spark.sql("drop table if exists just_usa_view_temp")

DataFrame[]

### _Spark DF and SQL Integration Example_

In [279]:
spark.read.json(flightDataJson2015).createOrReplaceTempView("some_sql_view") # DF => SQL

In [280]:
spark.sql\
(
"""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
"""
)\
.where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")\
.count() # SQL => DF

12

### _Creating Spark Managed Tables Example_:
-  USING syntax uses Spark Serialization whereas STORED AS uses Hive SerDe configuration which is much slower

In [281]:
spark.sql\
(
"""
CREATE TABLE flights (DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/json/2015-summary.json')
"""
)

DataFrame[]

In [282]:
spark.sql("describe flights").show()

+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|  DEST_COUNTRY_NAME|   string|   null|
|ORIGIN_COUNTRY_NAME|   string|   null|
|              count|   bigint|   null|
+-------------------+---------+-------+



In [283]:
spark.sql("select * from flights").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [284]:
spark.sql\
(
"""
CREATE TABLE flights_csv (DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING COMMENT "remember, the US will be most prevalent",
count LONG) USING csv OPTIONS (header true, path '/Users/grp/sparkTheDefinitiveGuide/data/flight-data/csv/2015-summary.csv')
"""
)

DataFrame[]

In [285]:
spark.sql("describe flights_csv").show(truncate=False)

+-------------------+---------+---------------------------------------+
|col_name           |data_type|comment                                |
+-------------------+---------+---------------------------------------+
|DEST_COUNTRY_NAME  |string   |null                                   |
|ORIGIN_COUNTRY_NAME|string   |remember, the US will be most prevalent|
|count              |bigint   |null                                   |
+-------------------+---------+---------------------------------------+



In [286]:
spark.sql("select * from flights_csv").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [287]:
spark.sql("CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights")
spark.sql("CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights")

DataFrame[]

### _Creating Spark Managed Partitioned Table Example_

In [288]:
spark.sql\
(
"""
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights
"""
)

DataFrame[]

In [289]:
spark.sql("select * from partitioned_flights").show(3)

+-------------------+-----+-----------------+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME|
+-------------------+-----+-----------------+
|            Romania|   15|    United States|
|            Croatia|    1|    United States|
|            Ireland|  344|    United States|
+-------------------+-----+-----------------+
only showing top 3 rows



In [290]:
spark.sql("describe partitioned_flights").show(truncate=False)

+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|ORIGIN_COUNTRY_NAME    |string   |null   |
|count                  |bigint   |null   |
|DEST_COUNTRY_NAME      |string   |null   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|DEST_COUNTRY_NAME      |string   |null   |
+-----------------------+---------+-------+



In [291]:
spark.sql("show PARTITIONS partitioned_flights").show(3, truncate=False)

+--------------------------+
|partition                 |
+--------------------------+
|DEST_COUNTRY_NAME=Algeria |
|DEST_COUNTRY_NAME=Angola  |
|DEST_COUNTRY_NAME=Anguilla|
+--------------------------+
only showing top 3 rows



### _Creating External Table Example_

In [292]:
spark.sql\
(
"""
CREATE EXTERNAL TABLE hive_flights (DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/Users/grp/sparkTheDefinitiveGuide/data/flight-data-hive/'
"""
)

DataFrame[]

In [293]:
spark.sql("show tables").show()

+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
|sparksql|            flights|      false|
|sparksql|        flights_csv|      false|
|sparksql|flights_from_select|      false|
|sparksql|       hive_flights|      false|
|sparksql|partitioned_flights|      false|
|        |          complexdf|       true|
|        |         datatable2|       true|
|        |          datetable|       true|
|        |           dfnonull|       true|
|        |            dftable|       true|
|        |         dfwithdate|       true|
|        |    graduateprogram|       true|
|        |             person|       true|
|        |      some_sql_view|       true|
|        |        sparkstatus|       true|
+--------+-------------------+-----------+



### _Insert and Insert Partition Example_

In [294]:
spark.sql\
(
"""
INSERT INTO flights_from_select SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights
"""
)

DataFrame[]

In [295]:
spark.sql\
(
"""
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT count, ORIGIN_COUNTRY_NAME FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES'
"""
)

DataFrame[]

### _Metadata Example_

In [296]:
spark.sql("REFRESH table partitioned_flights")
spark.sql("MSCK REPAIR TABLE partitioned_flights")

DataFrame[]

### _Dropping Tables Example_

In [297]:
'''
spark.sql("DROP TABLE flights")
spark.sql("DROP TABLE IF EXISTS flights")
'''

'\nspark.sql("DROP TABLE flights")\nspark.sql("DROP TABLE IF EXISTS flights")\n'

### _Cache/UnCache Tables Example_

In [298]:
'''
spark.sql("CACHE TABLE flights")
spark.sql("UNCACHE TABLE flights")
'''

'\nspark.sql("CACHE TABLE flights")\nspark.sql("UNCACHE TABLE flights")\n'

### _Views Example_

In [299]:
spark.sql("""
CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

spark.sql("""
CREATE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

spark.sql("""
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

spark.sql("""
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'
""")

DataFrame[]

In [300]:
spark.sql("show tables").show()

+--------+-------------------+-----------+
|database|          tableName|isTemporary|
+--------+-------------------+-----------+
|sparksql|            flights|      false|
|sparksql|        flights_csv|      false|
|sparksql|flights_from_select|      false|
|sparksql|       hive_flights|      false|
|sparksql|      just_usa_view|      false|
|sparksql|partitioned_flights|      false|
|        |          complexdf|       true|
|        |         datatable2|       true|
|        |          datetable|       true|
|        |           dfnonull|       true|
|        |            dftable|       true|
|        |         dfwithdate|       true|
|        |    graduateprogram|       true|
|        | just_usa_view_temp|       true|
|        |             person|       true|
|        |      some_sql_view|       true|
|        |        sparkstatus|       true|
+--------+-------------------+-----------+



### _Database Example_

In [301]:
'''
spark.sql("show databases").show()
spark.sql("create database example")
spark.sql("use example")
spark.sql("select * from table").show()
spark.sql("select * from example.table").show()
spark.sql("select current_database()").show()
spark.sql("drop database if exists example")
'''

'\nspark.sql("show databases").show()\nspark.sql("create database example")\nspark.sql("use example")\nspark.sql("select * from table").show()\nspark.sql("select * from example.table").show()\nspark.sql("select current_database()").show()\nspark.sql("drop database if exists example")\n'

In [302]:
spark.sql("select current_database()").show()

+------------------+
|current_database()|
+------------------+
|          sparksql|
+------------------+



### _Select Statements Example_

In [303]:
'''
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
    FROM relation[, relation, ...]
    [lateral_view[, lateral_view, ...]]
    [WHERE boolean_expression]
    [aggregation [HAVING boolean_expression]]
    [ORDER BY sort_expressions]
    [CLUSTER BY expressions]
    [DISTRIBUTE BY expressions]
    [SORT BY sort_expressions]
    [WINDOW named_window[, WINDOW named_window, ...]]
    [LIMIT num_rows]

named_expression:
    : expression [AS alias]

relation:
    | join_relation
    | (table_name|query|relation) [sample] [AS alias]
    : VALUES (expressions)[, (expressions), ...]
          [AS (column_name[, column_name, ...])]

expressions:
    : expression[, expression, ...]

sort_expressions:
    : expression [ASC|DESC][, expression [ASC|DESC], ...]
'''

'\nSELECT [ALL|DISTINCT] named_expression[, named_expression, ...]\n    FROM relation[, relation, ...]\n    [lateral_view[, lateral_view, ...]]\n    [WHERE boolean_expression]\n    [aggregation [HAVING boolean_expression]]\n    [ORDER BY sort_expressions]\n    [CLUSTER BY expressions]\n    [DISTRIBUTE BY expressions]\n    [SORT BY sort_expressions]\n    [WINDOW named_window[, WINDOW named_window, ...]]\n    [LIMIT num_rows]\n\nnamed_expression:\n    : expression [AS alias]\n\nrelation:\n    | join_relation\n    | (table_name|query|relation) [sample] [AS alias]\n    : VALUES (expressions)[, (expressions), ...]\n          [AS (column_name[, column_name, ...])]\n\nexpressions:\n    : expression[, expression, ...]\n\nsort_expressions:\n    : expression [ASC|DESC][, expression [ASC|DESC], ...]\n'

### _CASE ... WHEN ... THEN Statements Example_

In [304]:
spark.sql("""
SELECT
  CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
       WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
       ELSE -1 END
FROM partitioned_flights
""").show(3)

+--------------------------------------------------------------------------------------------------------+
|CASE WHEN (DEST_COUNTRY_NAME = UNITED STATES) THEN 1 WHEN (DEST_COUNTRY_NAME = Egypt) THEN 0 ELSE -1 END|
+--------------------------------------------------------------------------------------------------------+
|                                                                                                      -1|
|                                                                                                      -1|
|                                                                                                      -1|
+--------------------------------------------------------------------------------------------------------+
only showing top 3 rows



### _Structs Example_

In [305]:
spark\
.sql(\
    """
    CREATE VIEW IF NOT EXISTS nested_data AS
    SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights
    """)
spark.sql("select * from nested_data").show(3, False)
spark.sql("select country.DEST_COUNTRY_NAME, count from nested_data").show(3, False)
spark.sql("select country.*, count from nested_data").show(3, False)

+------------------------+-----+
|country                 |count|
+------------------------+-----+
|[United States, Romania]|15   |
|[United States, Croatia]|1    |
|[United States, Ireland]|344  |
+------------------------+-----+
only showing top 3 rows

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|United States    |15   |
|United States    |1    |
|United States    |344  |
+-----------------+-----+
only showing top 3 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
+-----------------+-------------------+-----+
only showing top 3 rows



### _Lists Example_

In [306]:
spark\
.sql(\
    """
    SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
    collect_set(ORIGIN_COUNTRY_NAME) as origin_set
    FROM flights GROUP BY DEST_COUNTRY_NAME
    """)\
.show(3, False)

spark.sql("SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights").show(3, False)

spark\
.sql(\
    """
    SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
    FROM flights GROUP BY DEST_COUNTRY_NAME
    """)\
.show(3, False)

spark\
.sql(\
    """
    CREATE OR REPLACE TEMP VIEW flights_agg AS
    SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
    FROM flights GROUP BY DEST_COUNTRY_NAME
    """)

spark.sql("SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg").show(3, False)

+--------+-------------+---------------+
|new_name|flight_counts|origin_set     |
+--------+-------------+---------------+
|Algeria |[4]          |[United States]|
|Angola  |[15]         |[United States]|
|Austria |[62]         |[United States]|
+--------+-------------+---------------+
only showing top 3 rows

+-----------------+--------------+
|DEST_COUNTRY_NAME|array(1, 2, 3)|
+-----------------+--------------+
|United States    |[1, 2, 3]     |
|United States    |[1, 2, 3]     |
|United States    |[1, 2, 3]     |
+-----------------+--------------+
only showing top 3 rows

+--------+----------------------+
|new_name|collect_list(count)[0]|
+--------+----------------------+
|Algeria |4                     |
|Angola  |15                    |
|Austria |62                    |
+--------+----------------------+
only showing top 3 rows

+---+-----------------+
|col|DEST_COUNTRY_NAME|
+---+-----------------+
|4  |Algeria          |
|15 |Angola           |
|62 |Austria          |
+---+------

### _Function Example_

In [307]:
spark.sql("SHOW FUNCTIONS").show(3) # list of spark functions
spark.sql("SHOW SYSTEM FUNCTIONS").show(3) # spark built in functions
spark.sql("SHOW USER FUNCTIONS").show(3) # user defined functions
spark.sql("SHOW FUNCTIONS 's*'").show(3) # functions that begin with 's'
spark.sql("SHOW FUNCTIONS LIKE 'collect*'").show(3) # functions that contain 'collect'
spark.sql("DESCRIBE FUNCTION collect_list").collect() # describes function

+--------+
|function|
+--------+
|       !|
|       %|
|       &|
+--------+
only showing top 3 rows

+--------+
|function|
+--------+
|       !|
|       %|
|       &|
+--------+
only showing top 3 rows

+--------+
|function|
+--------+
|power3py|
+--------+

+---------+
| function|
+---------+
|   second|
|sentences|
|      sha|
+---------+
only showing top 3 rows

+------------+
|    function|
+------------+
|collect_list|
| collect_set|
+------------+



[Row(function_desc='Function: collect_list'),
 Row(function_desc='Class: org.apache.spark.sql.catalyst.expressions.aggregate.CollectList'),
 Row(function_desc='Usage: collect_list(expr) - Collects and returns a list of non-unique elements.')]

### _Subquery Example_:
-  correlated subqueries
-  uncorrelated subqueries
-  predicate subqueries

In [308]:
# uncorrelated predicate subquery
print("uncorrelated predicate subquery:")

spark\
.sql(\
    """
    SELECT dest_country_name FROM flights
        GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5
    """)\
.show(5, False)

spark\
.sql(\
    """
    SELECT * FROM flights WHERE origin_country_name IN 
        (SELECT dest_country_name FROM flights GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)
    """)\
.show(3, False)

# correlated predicate subquery
print("correlated predicate subquery:")

spark\
.sql(\
    """
    SELECT * FROM flights f1
    WHERE EXISTS (SELECT 1 FROM flights f2
                WHERE f1.dest_country_name = f2.origin_country_name)
    AND EXISTS (SELECT 1 FROM flights f2
                WHERE f2.dest_country_name = f1.origin_country_name)
    """)\
.show(3, False)

# uncorrelated scalar queries
print("uncorrelated scalar queries:")

spark.sql("SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights").show(3, False)

uncorrelated predicate subquery:
+-----------------+
|dest_country_name|
+-----------------+
|United States    |
|Canada           |
|Mexico           |
|United Kingdom   |
|Japan            |
+-----------------+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|Egypt            |United States      |15   |
|Costa Rica       |United States      |588  |
|Senegal          |United States      |40   |
+-----------------+-------------------+-----+
only showing top 3 rows

correlated predicate subquery:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
+-----------------+-------------------+-----+
only showing top 3 rows

uncorrelated scalar queries:
+-----------------+-------------

### Spark SQL Configuration Example

In [309]:
spark.sql("SET spark.sql.shuffle.partitions=8")

DataFrame[key: string, value: string]

## _Chapter #11 - Datasets_

-  solely JVM language feature that only works with Scala and Java programming language
-  ability to define the object that each row in your Dataset will consist of
-  Scala:
    -  define case class object that holds schema
    -  Case Class:
        -  immutable
        -  comparison by structure (schema) instead of value
-  Java:
    -  define Java Bean
-  Spark converts the Spark Row format to the object specified (Scala case class or Java class)

### _Chapter #11 Exercises (Scala/Java Datasets)_

In [310]:
'''
case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)


// COMMAND ----------

val flightsDF = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]


// COMMAND ----------

flights.show(2)


// COMMAND ----------

flights.first.DEST_COUNTRY_NAME // United States


// COMMAND ----------

def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}


// COMMAND ----------

flights.filter(flight_row => originIsDestination(flight_row)).first()


// COMMAND ----------

flights.collect().filter(flight_row => originIsDestination(flight_row))


// COMMAND ----------

val destinations = flights.map(f => f.DEST_COUNTRY_NAME)


// COMMAND ----------

val localDestinations = destinations.take(5)


// COMMAND ----------

case class FlightMetadata(count: BigInt, randomData: BigInt)

val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
  .withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
  .as[FlightMetadata]


// COMMAND ----------

val flights2 = flights
  .joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))


// COMMAND ----------

flights2.selectExpr("_1.DEST_COUNTRY_NAME")


// COMMAND ----------

flights2.take(2)


// COMMAND ----------

val flights2 = flights.join(flightsMeta, Seq("count"))


// COMMAND ----------

val flights2 = flights.join(flightsMeta.toDF(), Seq("count"))


// COMMAND ----------

flights.groupBy("DEST_COUNTRY_NAME").count()


// COMMAND ----------

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()


// COMMAND ----------

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count().explain


// COMMAND ----------

def grpSum(countryName:String, values: Iterator[Flight]) = {
  values.dropWhile(_.count < 5).map(x => (countryName, x))
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)


// COMMAND ----------

def grpSum2(f:Flight):Integer = {
  1
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)


// COMMAND ----------

def sum2(left:Flight, right:Flight) = {
  Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r))
  .take(5)


// COMMAND ----------

flights.groupBy("DEST_COUNTRY_NAME").count().explain


// COMMAND ----------

'''

'\ncase class Flight(DEST_COUNTRY_NAME: String,\n                  ORIGIN_COUNTRY_NAME: String, count: BigInt)\n\n\n// COMMAND ----------\n\nval flightsDF = spark.read\n  .parquet("/data/flight-data/parquet/2010-summary.parquet/")\nval flights = flightsDF.as[Flight]\n\n\n// COMMAND ----------\n\nflights.show(2)\n\n\n// COMMAND ----------\n\nflights.first.DEST_COUNTRY_NAME // United States\n\n\n// COMMAND ----------\n\ndef originIsDestination(flight_row: Flight): Boolean = {\n  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME\n}\n\n\n// COMMAND ----------\n\nflights.filter(flight_row => originIsDestination(flight_row)).first()\n\n\n// COMMAND ----------\n\nflights.collect().filter(flight_row => originIsDestination(flight_row))\n\n\n// COMMAND ----------\n\nval destinations = flights.map(f => f.DEST_COUNTRY_NAME)\n\n\n// COMMAND ----------\n\nval localDestinations = destinations.take(5)\n\n\n// COMMAND ----------\n\ncase class FlightMetadata(count: BigInt, randomData

### grp