# SparkSession

SparkSession is automatically created when you start up a Notebook (e.g. Zeppelin, Databricks)

<img src="https://i.imgur.com/5Ai45fb.jpg" width=500px>

In [1]:
%spark
//Scala SparkSession
spark

In [2]:
%spark.pyspark
#PySpark SparkSession
spark

 

# Show DataFrame

`df.show()` is the Spark native API that displays data but it's not pretty. 

`z.show(df)` is a Zeppelin build-in feature that allows you to show a df result in a pretty way

In [4]:
%spark.pyspark

#List all hive tables in a df
tables_df = spark.sql("show tables")

In [5]:
%spark.pyspark

tables_df.show()

In [6]:
%spark.pyspark
z.show(tables_df)

 

# Spark SQL vs Dataframe

`%sql` is the Spark SQL interpreter

`%spark.pyspark` is the PySpark interpreter

`%spark` is the Spark Scala interpreter

In [8]:
%sql

select count(1) from wdi_csv_parquet

In [9]:
%spark.pyspark

#Read Hive data to a df (this is lazy)
wdi_df = spark.sql("SELECT * from wdi_csv_parquet")
#Persist df in memory for fast futuer access
wdi_df = wdi_df.cache()
wdi_df.printSchema()

#Spark action is eager
z.show(wdi_df.count())

 

# Show Historical GDP for Canada

- Re-write the hive query (left cell) using PySpark df

In [11]:
%sql
SELECT year, IndicatorValue as GDP
FROM wdi_csv_parquet
WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' and countryName = 'Canada'
ORDER BY year


In [12]:
%spark.pyspark
# Approach 1
wdi_canada_df1 = spark.sql("SELECT year, indicatorValue FROM wdi_csv_parquet WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND countryName = 'Canada' ORDER BY year")

#use z.show to display df result and draw a bar chart
z.show(wdi_canada_df1.select("year", "IndicatorValue"))

In [13]:
%spark.pyspark
# Approach 2
wdi_canada_df2 = wdi_df.select("year", "indicatorValue").filter((wdi_df.indicatorcode == "NY.GDP.MKTP.KD.ZG") & (wdi_df.countryname == "Canada")).orderBy("year")
z.show(wdi_canada_df2)

 

# Show GDP for Each County and Sort By Year

- Re-write the hive query (left cell) using PySpark df  
    - hint: you can create multiple DFs 

In [15]:
%sql
SELECT countryname,
       year,
       indicatorcode,
       indicatorvalue
FROM wdi_csv_parquet
WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG'
DISTRIBUTE BY countryname
SORT BY countryname, year


In [16]:
%spark.pyspark
# Approach 1
wdi_gdp_df1 = wdi_df.select("countryName","year","indicatorCode","indicatorValue").filter(wdi_df['indicatorCode'] == 'NY.GDP.MKTP.KD.ZG').repartition("countryName").orderBy(["countryName","year"],ascending=[1,1])
z.show(wdi_gdp_df1)

In [17]:
%spark.pyspark
# Approach 2
wdi_gdp_df2 = spark.sql("SELECT countryName, year, indicatorCode, indicatorValue FROM wdi_csv_parquet WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' DISTRIBUTE BY countryName SORT BY countryName, year")
z.show(wdi_gdp_df2)


# Find the highest GDP for each country

- Re-write the hive query (left cell) using PySpark df


In [19]:
%sql

SELECT wdi_csv_parquet.indicatorvalue AS value, 
       wdi_csv_parquet.year           AS year, 
       wdi_csv_parquet.countryname    AS country 
FROM   (SELECT Max(indicatorvalue) AS ind, 
               countryname 
        FROM   wdi_csv_parquet 
        WHERE  indicatorcode = 'NY.GDP.MKTP.KD.ZG' 
               AND indicatorvalue <> 0 
        GROUP  BY countryname) t1 
       INNER JOIN wdi_csv_parquet 
               ON t1.ind = wdi_csv_parquet.indicatorvalue 
                  AND t1.countryname = wdi_csv_parquet.countryname


In [20]:
%spark.pyspark
# Approach 1
from pyspark.sql.functions import max as sparkMax

wdi_high_gdp_df_inner = wdi_df.filter((wdi_df.indicatorcode == "NY.GDP.MKTP.KD.ZG") & (wdi_df.indicatorvalue != 0)).groupBy("countryName").agg(sparkMax("indicatorValue").alias("ind")).select("ind", wdi_df.countryname.alias("countryNameInner"))
wdi_high_gdp_df1 = wdi_df.join(wdi_high_gdp_df_inner, (wdi_df.indicatorvalue == wdi_high_gdp_df_inner.ind) & (wdi_df["countryName"] == wdi_high_gdp_df_inner.countryNameInner), "inner")
wdi_high_gdp_df1 = wdi_high_gdp_df1.select(wdi_high_gdp_df1["ind"].alias("value"),wdi_high_gdp_df1["year"],wdi_high_gdp_df1["countryname"].alias("country"))
z.show(wdi_high_gdp_df1)

In [21]:
%spark.pyspark
# Approach 2
wdi_high_gdp_df2 = spark.sql("SELECT wdi_csv_parquet.indicatorvalue AS value, wdi_csv_parquet.year AS year, wdi_csv_parquet.countryname AS country FROM (SELECT MAX(indicatorvalue) AS ind, countryname FROM wdi_csv_parquet WHERE ((indicatorcode = 'NY.GDP.MKTP.KD.ZG') AND (indicatorvalue != 0)) GROUP BY countryname) t1 INNER JOIN wdi_csv_parquet ON t1.ind = wdi_csv_parquet.indicatorvalue AND t1.countryname = wdi_csv_parquet.countryname")
z.show(wdi_high_gdp_df2)


In [22]:
%spark.pyspark
