# SQL in Spark: The Same Vibes

The topic of this notebook is how to write the same code via PySpark's SQL module vs. library functions.

In the following parts, each time, firstly SQL query comes, then library function calling follows.

🔷 This notebook should be run after _initialisation_nb_ notebook via the same kernel.

## Viewing the data

In [8]:
spark.sql(
'''
SELECT *
FROM
    newborns
LIMIT 5
'''
).show()

+----+--------+---+----+
|year|    name|sex|freq|
+----+--------+---+----+
|1993|  Abarna|  F|   1|
|1993| Abetare|  F|   1|
|1993|    Abir|  F|   1|
|1993| Abirami|  F|   1|
|1993|Adelaide|  F|   1|
+----+--------+---+----+



In [9]:
df.show(5)

+----+--------+---+----+
|year|    name|sex|freq|
+----+--------+---+----+
|1993|  Abarna|  F|   1|
|1993| Abetare|  F|   1|
|1993|    Abir|  F|   1|
|1993| Abirami|  F|   1|
|1993|Adelaide|  F|   1|
+----+--------+---+----+
only showing top 5 rows



**Instead of calling a DataFrame _(df)_, once the table is built within PySpark, we can call it by name as well.**

In [10]:
# 1)
spark.sql( "SHOW TABLES" ).show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         | newborns|       true|
+---------+---------+-----------+



In [11]:
# 2)
spark.table( 'newborns' ).show(5) 

+----+--------+---+----+
|year|    name|sex|freq|
+----+--------+---+----+
|1993|  Abarna|  F|   1|
|1993| Abetare|  F|   1|
|1993|    Abir|  F|   1|
|1993| Abirami|  F|   1|
|1993|Adelaide|  F|   1|
+----+--------+---+----+
only showing top 5 rows



## Aggregations(AGG): Calculating the overall newborn count in the data

In [12]:
spark.sql(
'''
SELECT
    SUM(freq) AS overall_newborn_count
FROM 
    newborns
'''
).show()

[Stage 9:>                                                          (0 + 1) / 1]

+---------------------+
|overall_newborn_count|
+---------------------+
|               128164|
+---------------------+



                                                                                

In [13]:
df.agg( {'freq': 'sum'} ).show()

+---------+
|sum(freq)|
+---------+
|   128164|
+---------+



**Below solution ⬇️ provides _alias_, the same as "AS" in SQL.**

In [14]:
from pyspark.sql.functions import sum

df.agg( sum('freq').alias('overall_newborn_count') ) \
    .show()

+---------------------+
|overall_newborn_count|
+---------------------+
|               128164|
+---------------------+



## AGG: Analysis of the _year_ data value in the overall data

In [15]:
spark.sql(
'''
SELECT
    COUNT(DISTINCT year) num_years,
    MIN(year) min_year,
    MAX(year) max_year
FROM 
    newborns
'''
).show()

[Stage 18:>                                                         (0 + 1) / 1]

+---------+--------+--------+
|num_years|min_year|max_year|
+---------+--------+--------+
|       30|    1993|    2022|
+---------+--------+--------+



                                                                                

In [16]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import max
from pyspark.sql.functions import min

spark.table( 'newborns' ).agg(
    countDistinct( 'year' ).alias( 'num_years' ),
    min( 'year' ).alias( 'min_year' ),
    max( 'year' ).alias( 'max_year' )
).show()

+---------+--------+--------+
|num_years|min_year|max_year|
+---------+--------+--------+
|       30|    1993|    2022|
+---------+--------+--------+



## AGG + Sort: Newborn count, based on distinct (grouped) years

In [17]:
spark.sql(
'''
SELECT
    year,
    SUM(freq) AS newborn_count
FROM 
    newborns
GROUP BY
    year
ORDER BY
    year ASC
'''
).tail(5)
# this time the final 5 rows of the overall query result is shown via "tail".

[Row(year=2018, newborn_count=5212),
 Row(year=2019, newborn_count=5134),
 Row(year=2020, newborn_count=5133),
 Row(year=2021, newborn_count=5261),
 Row(year=2022, newborn_count=4538)]

In [18]:
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum

df.groupBy( 'year' ) \
    .agg( sum( 'freq' ).alias( 'newborn_count' ) ) \
    .sort( asc( 'year' ) ) \
    .tail(5)

[Row(year=2018, newborn_count=5212),
 Row(year=2019, newborn_count=5134),
 Row(year=2020, newborn_count=5133),
 Row(year=2021, newborn_count=5261),
 Row(year=2022, newborn_count=4538)]

🔸 **_tail_ function needs to be used cautiously. As it brings the data to the memory, it might cause overflow.**

## AGG + OrderBy: Most popular newborn names over the years in Zurich

In [19]:
spark.sql(
'''
SELECT 
    name,
    SUM(freq) AS freq_count
FROM
    newborns
GROUP BY
    name
ORDER BY
    freq_count DESC
LIMIT 5
'''
).show()

[Stage 46:>                                                         (0 + 1) / 1]

+-----+----------+
| name|freq_count|
+-----+----------+
| Anna|       606|
|David|       599|
|Laura|       494|
| Noah|       470|
| Sara|       460|
+-----+----------+



                                                                                

In [20]:
from pyspark.sql.functions import desc
from pyspark.sql.functions import sum

df.groupBy( 'name' ) \
    .agg( sum('freq').alias('freq_count') ) \
    .orderBy( desc('freq_count') ) \
    .show(5)
# sort and orderBy do the same job for this case.

+-----+----------+
| name|freq_count|
+-----+----------+
| Anna|       606|
|David|       599|
|Laura|       494|
| Noah|       470|
| Sara|       460|
+-----+----------+
only showing top 5 rows



## Union: The most popular male and female newborn names

In [21]:
df_names = \
'''
(
    SELECT
        name,
        sex,
        SUM(freq) AS freq_count
    FROM
        newborns
    WHERE 
        sex = '{}'
    GROUP BY
        name, sex
    ORDER BY
        freq_count DESC
    LIMIT 1
)
'''

spark.sql(
    df_names.format('M') + 'UNION' + df_names.format('F')
).show()

[Stage 52:>                 (0 + 1) / 1][Stage 53:>                 (0 + 1) / 1]

+-----+---+----------+
| name|sex|freq_count|
+-----+---+----------+
|David|  M|       599|
| Anna|  F|       606|
+-----+---+----------+



                                                                                

In [22]:
from pyspark.sql.functions import desc
from pyspark.sql.functions import sum

df_male = df.filter( df.sex == 'M' ) \
                .groupBy( ['name', 'sex'] ) \
                .agg( sum('freq').alias('freq_count') ) \
                .orderBy( desc('freq_count') ).limit(1)

# this 'groupBy' syntax also works the same as above.
df_female = df.filter( df.sex == 'F' ) \
                .groupBy( 'name', 'sex' ) \
                .agg( sum('freq').alias('freq_count') ) \
                .orderBy( desc('freq_count') ).limit(1)

df_male.union( df_female ).show()

[Stage 61:>                 (0 + 1) / 1][Stage 62:>                 (0 + 1) / 1]

+-----+---+----------+
| name|sex|freq_count|
+-----+---+----------+
|David|  M|       599|
| Anna|  F|       606|
+-----+---+----------+



                                                                                

These names can be seen in the previous part, among the _most popular baby names_ as well.

## AGG + Filtering + Subquery: Top 3 most popular female names in recent times

_Recent times = Max. year in the data_

In [23]:
spark.sql(
'''
SELECT
    name, freq
FROM 
    newborns
WHERE 
    sex = 'F' AND year = ( SELECT MAX(year) from newborns )
ORDER BY
    freq DESC
LIMIT 3'''
).show()

+----+----+
|name|freq|
+----+----+
|Anna|  21|
|Nora|  18|
|Ella|  17|
+----+----+



                                                                                

In [24]:
from pyspark.sql.functions import desc

# instead of "head()[0]", "collect()[0][0]" can also be used below.
max_year = df.agg( {'year': 'max'} ).head()[0]

# "where" and "filter" can be used interchangeably below.
df.where( (df.sex == 'F') & (df.year == max_year) ) \
    .select( 'name', 'freq' ) \
    .orderBy( desc('freq') ) \
    .show(3)
# If you end it with "limit" instead of "show" right above, it doesn't show the records;
# just returns a DataFrame object.

+----+----+
|name|freq|
+----+----+
|Anna|  21|
|Nora|  18|
|Ella|  17|
+----+----+
only showing top 3 rows



**The same result was also shared in the [statistics page](https://www.stadt-zuerich.ch/content/prd/de/index/statistik/themen/bevoelkerung/geburten-kinder-vornamen/vornamen.html) of the city of Zurich as showcased below.**

In [32]:
# import image module
from IPython.display import Image
# get the image
Image( url="img/top_female_names.png", width=500 )