In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
          .master("local") \
          .getOrCreate()

myRange = spark.range(1000).toDF("number")

In [13]:
myRange.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



In [14]:
divisBy2 = myRange.where("number % 2 = 0")

In [15]:
divisBy2.show()

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
|    10|
|    12|
|    14|
|    16|
|    18|
|    20|
|    22|
|    24|
|    26|
|    28|
|    30|
|    32|
|    34|
|    36|
|    38|
+------+
only showing top 20 rows



In [16]:
divisBy2.count()

500

In [27]:
flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("../data/2015-summary.csv")


In [28]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [29]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#174 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#174 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=333]
      +- FileScan csv [DEST_COUNTRY_NAME#172,ORIGIN_COUNTRY_NAME#173,count#174] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/Luan Lima/Desktop/github/7_gitspark/pyspark/13_Spark-Th..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [30]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [31]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [32]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [33]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [34]:
sqlWay.show()

+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|             Moldova|       1|
|             Bolivia|       1|
|             Algeria|       1|
|Turks and Caicos ...|       1|
|            Pakistan|       1|
|    Marshall Islands|       1|
|            Suriname|       1|
|              Panama|       1|
|         New Zealand|       1|
|             Liberia|       1|
|             Ireland|       1|
|              Zambia|       1|
|            Malaysia|       1|
|               Japan|       1|
|    French Polynesia|       1|
|           Singapore|       1|
|             Denmark|       1|
|               Spain|       1|
|             Bermuda|       1|
|            Kiribati|       1|
+--------------------+--------+
only showing top 20 rows



In [14]:
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

In [15]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#21], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#21, 5), ENSURE_REQUIREMENTS, [plan_id=55]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#21], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/Luan Lima/Desktop/github/7_gitspark/pyspark/13_Spark-Th..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#21], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#21, 5), ENSURE_REQUIREMENTS, [plan_id=68]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#21], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemo

In [17]:
spark.sql("SELECT * from flight_data_2015").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [20]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [22]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [23]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [25]:
from pyspark.sql.functions import desc

In [26]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [35]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#208L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#172,destination_total#208L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#172], functions=[sum(count#174)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#172, 5), ENSURE_REQUIREMENTS, [plan_id=403]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#172], functions=[partial_sum(count#174)])
            +- FileScan csv [DEST_COUNTRY_NAME#172,count#174] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/Luan Lima/Desktop/github/7_gitspark/pyspark/13_Spark-Th..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [10]:
spark.stop()