In [0]:
%%capture
!pip install --upgrade pip
!pip install wget

In [0]:
import wget
from pyspark.sql.functions import col,avg,round,max,lower,upper,lit,ltrim,rtrim,lpad,trim

In [0]:
for i in range(6):
    wget.download('https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/201{}-summary.csv'.format(i),'/tmp/201{}-summary.csv'.format(i))
    dbutils.fs.mv("file:/tmp/201{}-summary.csv".format(i), "dbfs:/airline/201{}-summary.csv".format(i))

In [0]:
df = spark.read.csv('dbfs:/airline',inferSchema=True,header=True)

In [0]:
df.printSchema() 

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [0]:
type(df)

Out[29]: pyspark.sql.dataframe.DataFrame

In [0]:
display(df)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
df.count()

Out[31]: 1502

In [0]:
df.createOrReplaceTempView('flights')

In [0]:
%sql
SELECT * FROM flights

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
%sql
DROP TABLE IF EXISTS perm_flights;
create or replace table perm_flights AS select * from flights;

num_affected_rows,num_inserted_rows


In [0]:
%sql
CONVERT TO DELTA perm_flights 

In [0]:
%sql
select DEST_COUNTRY_NAME, round(AVG(count),0) AS AVG_QTD_VOOS
from perm_flights
group by 1
order by 1

DEST_COUNTRY_NAME,AVG_QTD_VOOS
Afghanistan,8.0
Algeria,5.0
Angola,13.0
Anguilla,26.0
Antigua and Barbuda,130.0
Argentina,188.0
Aruba,351.0
Australia,294.0
Austria,41.0
Azerbaijan,8.0


In [0]:
display(df
    .groupBy(col('DEST_COUNTRY_NAME'))
    .agg(avg(col('count')).alias('AVG_QTD_VOOS'))
    .orderBy(col('DEST_COUNTRY_NAME'))
    .select(col('DEST_COUNTRY_NAME'),round(col('AVG_QTD_VOOS'),0).alias('AVG_QTD_VOOS'))
)

DEST_COUNTRY_NAME,AVG_QTD_VOOS
Afghanistan,8.0
Algeria,5.0
Angola,13.0
Anguilla,26.0
Antigua and Barbuda,130.0
Argentina,188.0
Aruba,351.0
Australia,294.0
Austria,41.0
Azerbaijan,8.0


In [0]:
%sql
select max(count) from perm_flights

max(count)
370002


In [0]:
display(df
        .select(max(col('count')))
)

max(count)
370002


In [0]:
%sql
select * from perm_flights
where count < 2

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
Equatorial Guinea,United States,1
Malta,United States,1
Saint Vincent and the Grenadines,United States,1
Slovakia,United States,1
United States,Cyprus,1
United States,Bosnia and Herzegovina,1
United States,Algeria,1
Azerbaijan,United States,1
Liberia,United States,1


In [0]:
display(df
        .filter(col('count')<2)
)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
Equatorial Guinea,United States,1
Malta,United States,1
Saint Vincent and the Grenadines,United States,1
Slovakia,United States,1
United States,Cyprus,1
United States,Bosnia and Herzegovina,1
United States,Algeria,1
Azerbaijan,United States,1
Liberia,United States,1


In [0]:
%sql
select count(distinct origin_country_name, dest_country_name) as count_uniq_paises
from perm_flights

count_uniq_paises
320


In [0]:
display(df
        .select(col('origin_country_name'),col('dest_country_name')).distinct().count()
)

320

In [0]:
display(df.sort(col('count')))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Bulgaria,1
Niger,United States,1
Lithuania,United States,1
Moldova,United States,1
United States,French Guiana,1
Burundi,United States,1
Burkina Faso,United States,1
Indonesia,United States,1
United States,Cambodia,1
New Caledonia,United States,1


In [0]:
display(df.orderBy(col('count')))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Malawi,United States,1
United States,Tunisia,1
United States,Greenland,1
Slovakia,United States,1
Kazakhstan,United States,1
United States,Papua New Guinea,1
Kyrgyzstan,United States,1
Mauritania,United States,1
Saint Vincent and the Grenadines,United States,1
Kosovo,United States,1


In [0]:
display(df.orderBy(col('count').desc()))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,United States,370002
United States,United States,358354
United States,United States,352742
United States,United States,348113
United States,United States,347452
United States,United States,343132
United States,Canada,8650
Canada,United States,8514
United States,Canada,8483
Canada,United States,8399


In [0]:
display(df.describe())

summary,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
count,1502,1502,1502.0
mean,,,1718.3189081225032
stddev,,,22300.36861966889
min,Afghanistan,Afghanistan,1.0
max,Zimbabwe,Zimbabwe,370002.0


In [0]:
%sql
select lower(DEST_COUNTRY_NAME),upper(ORIGIN_COUNTRY_NAME)
from perm_flights

lower(DEST_COUNTRY_NAME),upper(ORIGIN_COUNTRY_NAME)
united states,ROMANIA
united states,IRELAND
united states,INDIA
egypt,UNITED STATES
equatorial guinea,UNITED STATES
united states,SINGAPORE
united states,GRENADA
costa rica,UNITED STATES
senegal,UNITED STATES
united states,MARSHALL ISLANDS


In [0]:
display(df.
        select(lower(col('DEST_COUNTRY_NAME')),upper(col('ORIGIN_COUNTRY_NAME')))
)

lower(DEST_COUNTRY_NAME),upper(ORIGIN_COUNTRY_NAME)
united states,ROMANIA
united states,IRELAND
united states,INDIA
egypt,UNITED STATES
equatorial guinea,UNITED STATES
united states,SINGAPORE
united states,GRENADA
costa rica,UNITED STATES
senegal,UNITED STATES
united states,MARSHALL ISLANDS


In [0]:
display(df.
        select(
        ltrim(lit(' hi')),
        rtrim(lit('hello ')),
        trim(lit(' See ya ')),
        lpad(lit('1'),3,'0')
        ).show(1)
)

+----------+-------------+--------------+-------------+
|ltrim( hi)|rtrim(hello )|trim( See ya )|lpad(1, 3, 0)|
+----------+-------------+--------------+-------------+
|        hi|        hello|        See ya|          001|
+----------+-------------+--------------+-------------+
only showing top 1 row



In [0]:
sqlway = spark.sql('select dest_country_name, count(*) from flights group by dest_country_name')

In [0]:
dataframeway = df.groupBy(col('dest_country_name')).count()

In [0]:
sqlway.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dest_country_name#1308], functions=[finalmerge_count(merge count#2710L) AS count(1)#2662L])
   +- Exchange hashpartitioning(dest_country_name#1308, 200), ENSURE_REQUIREMENTS, [id=#2171]
      +- HashAggregate(keys=[dest_country_name#1308], functions=[partial_count(1) AS count#2710L])
         +- FileScan csv [DEST_COUNTRY_NAME#1308] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/airline], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [0]:
dataframeway.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dest_country_name#1308], functions=[finalmerge_count(merge count#2715L) AS count(1)#2705L])
   +- Exchange hashpartitioning(dest_country_name#1308, 200), ENSURE_REQUIREMENTS, [id=#2217]
      +- HashAggregate(keys=[dest_country_name#1308], functions=[partial_count(1) AS count#2715L])
         +- FileScan csv [DEST_COUNTRY_NAME#1308] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/airline], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [0]:
dataframeway.explain() == sqlway.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dest_country_name#1308], functions=[finalmerge_count(merge count#2715L) AS count(1)#2705L])
   +- Exchange hashpartitioning(dest_country_name#1308, 200), ENSURE_REQUIREMENTS, [id=#2217]
      +- HashAggregate(keys=[dest_country_name#1308], functions=[partial_count(1) AS count#2715L])
         +- FileScan csv [DEST_COUNTRY_NAME#1308] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/airline], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dest_country_name#1308], functions=[finalmerge_count(merge count#2710L) AS count(1)#2662L])
   +- Exchange hashpartitioning(dest_country_name#1308, 200), ENSURE_REQUIREMENTS, [id=#2171]
      +- HashAggregate(keys=[dest_country_name#1308], functions=[partial_count(1) AS count#2710L])
         +- FileScan csv [DEST_COUN