In [6]:
from __future__ import print_function
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType


In [7]:
#Create a Spark Session
spark = SparkSession\
    .builder\
    .appName("SampleSparkSQLApp")\
    .getOrCreate()

In [13]:
#Create a sample dataframe with 10 rows, numbers from 0 to 10
myRange = spark.range(10).toDF("number")

In [14]:
#Pring the schema
myRange.printSchema()

root
 |-- number: long (nullable = false)



In [15]:
#Show the contents of the DF
myRange.show()

[Stage 0:>                                                          (0 + 1) / 2]

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+



                                                                                

In [8]:
#Reading a File to a DF
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("/home/cdsw/resources/flight-data/csv/2015-summary.csv")

                                                                                

In [4]:
#Print Schema
flightData2015.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [9]:
#Register a Temp View and convert a DF into a table
flightData2015.createOrReplaceTempView("flight_data_2015")

In [31]:
#Print the first 3 rows
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [37]:
#Prep SQL in a SQL Way
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [38]:
#Prep SQL in a DF way
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [39]:
#Run Explain on SQL
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#70], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#70, 200), ENSURE_REQUIREMENTS, [plan_id=185]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#70], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#70] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/cdsw/resources/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [40]:
#Run Explain on DF
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#70], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#70, 200), ENSURE_REQUIREMENTS, [plan_id=198]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#70], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#70] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/cdsw/resources/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [41]:
#Run SQL
sqlWay.show()

+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|            Anguilla|       1|
|              Russia|       1|
|            Paraguay|       1|
|             Senegal|       1|
|              Sweden|       1|
|            Kiribati|       1|
|              Guyana|       1|
|         Philippines|       1|
|            Djibouti|       1|
|            Malaysia|       1|
|           Singapore|       1|
|                Fiji|       1|
|              Turkey|       1|
|                Iraq|       1|
|             Germany|       1|
|              Jordan|       1|
|               Palau|       1|
|Turks and Caicos ...|       1|
|              France|       1|
|              Greece|       1|
+--------------------+--------+
only showing top 20 rows



In [42]:
from pyspark.sql.functions import max

In [43]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [44]:
#Find top 5 destination countries with a SQL way
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

In [45]:
maxSql.show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [10]:
#Find top 5 destinations countries with a DF way
from pyspark.sql.functions import desc
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

[Stage 2:>                                                          (0 + 1) / 1]

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



                                                                                

In [11]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#76L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#38,destination_total#76L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[sum(count#40)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#38, 200), ENSURE_REQUIREMENTS, [plan_id=115]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#38], functions=[partial_sum(count#40)])
            +- FileScan csv [DEST_COUNTRY_NAME#38,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/cdsw/resources/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [5]:
spark.stop()