# Spark II - Dataframes and SQL

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

spark = (SparkSession
    .builder
    .appName("Spark SQL Course")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/04 10:24:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# `DataFrame`

In [2]:
from pyspark.sql import Row

row1 = Row(name="John", age=21)
row2 = Row(name="James", age=32)
row3 = Row(name="Jane", age=18)
row1['name']

'John'

In [3]:
df = spark.createDataFrame([row1, row2, row3])

In [4]:
df.collect()

                                                                                

[Row(name='John', age=21), Row(name='James', age=32), Row(name='Jane', age=18)]

In [5]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [6]:
df.show()

                                                                                

+-----+---+
| name|age|
+-----+---+
| John| 21|
|James| 32|
| Jane| 18|
+-----+---+



In [7]:
print(df.rdd.toDebugString().decode("utf-8"))

(8) MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[11] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  SQLExecutionRDD[10] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []
 |  MapPartitionsRDD[3] at map at SerDeUtil.scala:69 []
 |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:117 []
 |  PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []


In [8]:
df.rdd.getNumPartitions()

8

## Creating dataframes

In [9]:
rows = [
    Row(name="John", age=21, gender="male"),
    Row(name="James", age=25, gender="female"),
    Row(name="Albert", age=46, gender="male")
]
df = spark.createDataFrame(rows)
df.show()

                                                                                

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



                                                                                

In [10]:
column_names = ["name", "age", "gender"]
rows = [
    ["John", 21, "male"],
    ["James", 25, "female"],
    ["Albert", 46, "male"]
]
df = spark.createDataFrame(rows, column_names)
df.show()

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



                                                                                

In [11]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



In [12]:
column_names = ["name", "age", "gender"]
rdd = sc.parallelize([
    ("John", 21, "male"),
    ("James", 25, "female"),
    ("Albert", 46, "male")
])
df = spark.createDataFrame(rdd, column_names)
df.show()

                                                                                

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 21|  male|
| James| 25|female|
|Albert| 46|  male|
+------+---+------+



Dataframe to rdd and vice versa

In [13]:
rdd2 = df.rdd


In [14]:
rdd3 = rdd2.map(lambda x: [x[0],x[1]+10,x[2]])

In [15]:
df = rdd3.toDF(["name","age","gender"])

                                                                                

In [16]:
df.show()

                                                                                

+------+---+------+
|  name|age|gender|
+------+---+------+
|  John| 31|  male|
| James| 35|female|
|Albert| 56|  male|
+------+---+------+



                                                                                

## Schema

In [17]:
df.schema

StructType([StructField('name', StringType(), True), StructField('age', LongType(), True), StructField('gender', StringType(), True)])

In [18]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



In [19]:
type(df.schema)

pyspark.sql.types.StructType

In [21]:
from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])
rows = [("John", 21, "male")]
df = spark.createDataFrame(rows, schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



# Queries

In [22]:
column_names = ["name", "age", "gender"]
rows = [
    ["John", 21, "male"],
    ["Jane", 25, "female"]
]
df = spark.createDataFrame(rows, column_names)

# Create a temporary view from the DataFrame
df.createOrReplaceTempView("new_view")

# Apply the query
query = "SELECT name, age FROM new_view WHERE gender='male'"
men_df = spark.sql(query)
men_df.show()

                                                                                

+----+---+
|name|age|
+----+---+
|John| 21|
+----+---+



## `SELECT`

In [23]:
df.createOrReplaceTempView("table")
query = "SELECT name, age FROM table"
spark.sql(query).show()

                                                                                

+----+---+
|name|age|
+----+---+
|John| 21|
|Jane| 25|
+----+---+



                                                                                

In [24]:
df.select("name", "age").show()

+----+---+
|name|age|
+----+---+
|John| 21|
|Jane| 25|
+----+---+



## `WHERE`

In [25]:
df.createOrReplaceTempView("table")
query = "SELECT * FROM table WHERE age > 21"
spark.sql(query).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [26]:
df = df.where("age > 21")

In [27]:
df.show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



# Alternatively:
df.where(df['age'] > 21).show()

In [28]:
df.where(df.age > 21).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [29]:
df.select("*").where("age > 21").show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



## `LIMIT`

In [30]:
df.createOrReplaceTempView("table")
query = "SELECT * FROM table LIMIT 1"
spark.sql(query).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+



In [24]:
df.limit(1).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



In [25]:
df.select("*").limit(1).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21|  male|
+----+---+------+



## `ORDER BY`

In [26]:
df.createOrReplaceTempView("table")
query = "SELECT * FROM table ORDER BY name ASC"
spark.sql(query).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
|John| 21|  male|
+----+---+------+



In [27]:
df.orderBy(df.name.asc()).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
|John| 21|  male|
+----+---+------+



## `ALIAS`

In [28]:
df.createOrReplaceTempView("table")
query = "SELECT name, age, gender AS sex FROM table"
spark.sql(query).show()

+----+---+------+
|name|age|   sex|
+----+---+------+
|John| 21|  male|
|Jane| 25|female|
+----+---+------+



In [29]:
df.select(df.name, df.age, df.gender.alias('sex')).show()

+----+---+------+
|name|age|   sex|
+----+---+------+
|John| 21|  male|
|Jane| 25|female|
+----+---+------+



## `CAST`

In [30]:
df.createOrReplaceTempView("table")
query = "SELECT name, cast(age AS float) AS age_f FROM table"
spark.sql(query).show()

+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+



In [31]:
df.select(df.name, df.age.cast("float").alias("age_f")).show()

+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+



In [32]:
new_age_col = df.age.cast("float").alias("age_f")
df.select(df.name, new_age_col).show()

+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+



## Adding new columns

In [33]:
df.createOrReplaceTempView("table")
query = "SELECT *, 12*age AS age_months FROM table"
spark.sql(query).show()

+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21|  male|       252|
|Jane| 25|female|       300|
+----+---+------+----------+



In [34]:
df.withColumn("age_months", df.age * 12).show()

+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21|  male|       252|
|Jane| 25|female|       300|
+----+---+------+----------+



In [35]:
df.select("*", (df.age * 12).alias("age_months")).show()

+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21|  male|       252|
|Jane| 25|female|       300|
+----+---+------+----------+



# Column functions

## Numeric functions examples

In [36]:
from pyspark.sql import functions as fn

columns = ["brand", "cost"]
df = spark.createDataFrame([
    ("garnier", 3.49),
    ("elseve", 2.71)
], columns)

round_cost = fn.round(df.cost, 1)
floor_cost = fn.floor(df.cost)
ceil_cost = fn.ceil(df.cost)

df.withColumn('round', round_cost)\
    .withColumn('floor', floor_cost)\
    .withColumn('ceil', ceil_cost)\
    .show()

+-------+----+-----+-----+----+
|  brand|cost|round|floor|ceil|
+-------+----+-----+-----+----+
|garnier|3.49|  3.5|    3|   4|
| elseve|2.71|  2.7|    2|   3|
+-------+----+-----+-----+----+



## Date functions examples

In [38]:
from datetime import date
from pyspark.sql import functions as fn

df = spark.createDataFrame([
    (date(2015, 1, 1), date(2015, 1, 15)),
    (date(2015, 2, 21), date(2015, 3, 8)),
], ["start_date", "end_date"])

days_between = fn.datediff(df.end_date, df.start_date)
start_month = fn.month(df.start_date)

df.withColumn('days_between', days_between)\
    .withColumn('start_month', start_month)\
    .show()

+----------+----------+------------+-----------+
|start_date|  end_date|days_between|start_month|
+----------+----------+------------+-----------+
|2015-01-01|2015-01-15|          14|          1|
|2015-02-21|2015-03-08|          15|          2|
+----------+----------+------------+-----------+



## User-defined functions

In [40]:
from pyspark.sql import functions as fn
from pyspark.sql.types import StringType

df = spark.createDataFrame([(1, 3), (4, 2)], ["first", "second"])

def my_func(col_1, col_2):
    if (col_1 > col_2):
        return "{} is bigger than {}".format(col_1, col_2)
    else:
        return "{} is bigger than {}".format(col_2, col_1)

my_udf = fn.udf(my_func, StringType())

df.withColumn("udf", my_udf(df['first'], df['second'])).show()

+-----+------+------------------+
|first|second|               udf|
+-----+------+------------------+
|    1|     3|3 is bigger than 1|
|    4|     2|4 is bigger than 2|
+-----+------+------------------+



# Agregations

## Examples using the Dataframe API

In [47]:
from pyspark.sql import functions as fn

products = spark.createDataFrame([
    ('1', 'mouse', 'microsoft', 39.99),
    ('2', 'mouse', 'microsoft', 59.99),
    ('3', 'keyboard', 'microsoft', 59.99),
    ('4', 'keyboard', 'logitech', 59.99),
    ('5', 'mouse', 'logitech', 29.99),
], ['prod_id', 'prod_cat', 'prod_brand', 'prod_value'])

products.groupBy('prod_cat').avg('prod_value').show()

+--------+-----------------+
|prod_cat|  avg(prod_value)|
+--------+-----------------+
|keyboard|            59.99|
|   mouse|43.32333333333333|
+--------+-----------------+



In [48]:
products.groupBy('prod_cat').agg(fn.avg('prod_value')).show()

+--------+-----------------+
|prod_cat|  avg(prod_value)|
+--------+-----------------+
|keyboard|            59.99|
|   mouse|43.32333333333333|
+--------+-----------------+



In [49]:
from pyspark.sql import functions as fn

products.groupBy('prod_brand', 'prod_cat')\
    .agg(fn.avg('prod_value')).show()

+----------+--------+---------------+
|prod_brand|prod_cat|avg(prod_value)|
+----------+--------+---------------+
| microsoft|   mouse|          49.99|
|  logitech|keyboard|          59.99|
| microsoft|keyboard|          59.99|
|  logitech|   mouse|          29.99|
+----------+--------+---------------+



In [32]:
from pyspark.sql import functions as fn

In [50]:
from pyspark.sql import functions as fn

products.groupBy('prod_brand').agg(
    fn.round(fn.avg('prod_value'), 1).alias('average'),
    fn.ceil(fn.sum('prod_value')).alias('sum'),
    fn.min('prod_value').alias('min')
).show()

+----------+-------+---+-----+
|prod_brand|average|sum|  min|
+----------+-------+---+-----+
|  logitech|   45.0| 90|29.99|
| microsoft|   53.3|160|39.99|
+----------+-------+---+-----+



## Example using a query

In [51]:
products.createOrReplaceTempView("products")

query = """
SELECT
prod_brand,
round(avg(prod_value), 1) AS average,
min(prod_value) AS min
FROM products
GROUP BY prod_brand
"""

spark.sql(query).show()

+----------+-------+-----+
|prod_brand|average|  min|
+----------+-------+-----+
|  logitech|   45.0|29.99|
| microsoft|   53.3|39.99|
+----------+-------+-----+



More Examples - Reading from a csv file

In [31]:
df4 = spark.read.options(header='True',inferSchema='True') \
  .csv("netflix-titles.csv")

                                                                                

In [32]:
df4.createOrReplaceTempView("table")
query= "SELECT * FROM table"
spark.sql(query).show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                NULL|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|                NULL|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglan

In [33]:
df4.createOrReplaceTempView("table")
query= "SELECT title FROM table ORDER BY title ASC"
spark.sql(query).show()

+--------------------+
|               title|
+--------------------+
|                NULL|
|                NULL|
|"Behind ""The Cov...|
|"Escape from the ...|
|"Gabriel ""Fluffy...|
|"Waiting for ""Su...|
|              #Alive|
|#AnneFrank - Para...|
|   #FriendButMarried|
| #FriendButMarried 2|
|               #Roxy|
|           #Rucker50|
|             #Selfie|
|          #Selfie 69|
|            #blackAF|
|    #cats_the_mewvie|
|        #realityhigh|
|                 '76|
|                 '89|
|            (T)ERROR|
+--------------------+
only showing top 20 rows



                                                                                

- How many movies were released in 2018

In [47]:
df4.createOrReplaceTempView("table")
query= "SELECT count(type) AS movie_count FROM table WHERE type= 'Movie' AND release_year= 2018"
spark.sql(query).show()

+-----------+
|movie_count|
+-----------+
|        765|
+-----------+



- which country produced the most movies

In [53]:
df4.createOrReplaceTempView("table")
query= "SELECT country, count(title) AS movie_count FROM table WHERE type= 'Movie' GROUP BY country ORDER BY movie_count DESC"
spark.sql(query).show()

+--------------------+-----------+
|             country|movie_count|
+--------------------+-----------+
|       United States|       2047|
|               India|        893|
|                NULL|        440|
|      United Kingdom|        206|
|              Canada|        122|
|               Spain|         97|
|               Egypt|         92|
|             Nigeria|         85|
|           Indonesia|         77|
|              Turkey|         76|
|               Japan|         76|
|              France|         74|
|         Philippines|         73|
|              Mexico|         70|
|United Kingdom, U...|         63|
|United States, Ca...|         51|
|              Brazil|         50|
|           Hong Kong|         50|
|             Germany|         47|
|         South Korea|         41|
+--------------------+-----------+
only showing top 20 rows



Practice - 

- How many tv shows lasted 1 season
- which year had the least number of tv shows produced
- when was the earliest release date for a movie in the dataset. 

In [61]:
df4.createOrReplaceTempView("table")
query= "SELECT count(*) AS count FROM table WHERE type='TV Show'AND duration= '1 Season'"
spark.sql(query).show()

+-----+
|count|
+-----+
| 1791|
+-----+



In [79]:
df4.createOrReplaceTempView("table")
query= "SELECT release_year, COUNT(title) AS tv_show_count FROM table WHERE type= 'TV Show' GROUP BY release_year ORDER BY tv_show_count ASC"
spark.sql(query).show()

+-----------------+-------------+
|     release_year|tv_show_count|
+-----------------+-------------+
|             1967|            1|
|             1972|            1|
|             1977|            1|
|December 15, 2020|            1|
|             1981|            1|
|             1974|            1|
|             1946|            1|
|             1989|            1|
|             1963|            1|
|             1925|            1|
|    Nse Ikpe-Etim|            1|
|      Jade Eshete|            1|
|             1985|            1|
|             1979|            1|
|             1991|            1|
|             1945|            1|
|             1988|            2|
|             1995|            2|
|             1994|            2|
|             1986|            2|
+-----------------+-------------+
only showing top 20 rows



In [85]:
df4.createOrReplaceTempView("table")
query= "SELECT DISTINCT (release_year) FROM table"
spark.sql(query).show()

+-----------------+
|     release_year|
+-----------------+
|     Ted Ferguson|
|             1987|
|             1956|
|             2016|
|             2020|
|             2012|
|             1958|
|           40 min|
|             1943|
|             1972|
| Marquell Manning|
|             1988|
|             2019|
|             2017|
|             1977|
|             2014|
|             1971|
|             1984|
|             2013|
|             1982|
+-----------------+
only showing top 20 rows



In [90]:
df4.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [92]:
df4.select("release_year").distinct().show(50)

+-----------------+
|     release_year|
+-----------------+
|     Ted Ferguson|
|             1987|
|             1956|
|             2016|
|             2020|
|             2012|
|             1958|
|           40 min|
|             1943|
|             1972|
| Marquell Manning|
|             1988|
|             2019|
|             2017|
|             1977|
|             2014|
|             1971|
|             1984|
|             2013|
|             1982|
|             2005|
|             2000|
|             1965|
|             1962|
|             1954|
|   Charles Rocket|
|December 15, 2020|
|             1981|
|   Peter Ferriero|
|             1978|
|             1974|
|             2002|
|             1959|
|       Paul Sambo|
|             2018|
|             2009|
|    United States|
|             1995|
|             1964|
|          Dr. Dre|
|             1946|
|             2006|
|       Nick Kroll|
|             1976|
|     Imanol Arias|
|             1942|
|             1947|


In [93]:
df4.createOrReplaceTempView("table")
query= "SELECT DISTINCT(release_year) FROM table WHERE release_year IS NOT NULL AND release_year RLIKE '^[0-9]+$' AND type= 'Movie' ORDER BY CAST(release_year AS INT) ASC LIMIT 1"
spark.sql(query).show()

+------------+
|release_year|
+------------+
|        1942|
+------------+

