In [10]:
from bs4 import BeautifulSoup
import requests
import os
import datetime
import pandas as pd

In [2]:
# Instalación de Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
#Obtener las versiones de spark la pagina web
url = 'https://downloads.apache.org/spark/' 
r = requests.get(url)
html_doc = r.text
soup = BeautifulSoup(html_doc)

In [4]:
# read the web page and get the lastest versions available
link_files = []
for link in soup.find_all('a'):
  link_files.append(link.get('href'))
spark_link = [x for x in link_files if 'spark' in x]  
ver_spark = spark_link[1][:-1] # get lastest versions of pyspark and drop remove '/' from the string
print(ver_spark)

spark-3.0.3


In [5]:
# intall the wanted version
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [6]:
# Set the proper enviroment
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [7]:
# import libraries
import findspark
from datetime import datetime
findspark.init()

In [8]:
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [9]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

##### Create single column df using list

In [11]:
age_list = [21, 23, 18, 41, 32]

In [12]:
spark.createDataFrame(age_list, schema=IntegerType())

DataFrame[value: int]

In [13]:
names_list = ['Scott', 'Donald', 'Mickey']

In [14]:
spark.createDataFrame(names_list, StringType())

DataFrame[value: string]

##### Create multiple column df using lists

In [15]:
age_list = [(21, ), (23, ), (18, ), (41, ), (32, )]

In [16]:
spark.createDataFrame(age_list, 'age int').show()

+---+
|age|
+---+
| 21|
| 23|
| 18|
| 41|
| 32|
+---+



In [17]:
user_list = [(1, 'Scott'), (2, 'Donald'), (3, 'Mickey'), (4, 'Elvis')]

In [18]:
df = spark.createDataFrame(user_list, 'user_id int, user_first_name string')

In [19]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- user_first_name: string (nullable = true)



In [20]:
df.show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



##### Convert list of dicts into Spark dataframe using Row

In [21]:
user_list_dict = [{'user_id':1, 'user_name':'Scott'}, 
                  {'user_id':2, 'user_name':'Donald'}, 
                  {'user_id':3, 'user_name':'Mickey'}, 
                  {'user_id':4, 'user_name':'Elvis'}]

In [22]:
df_dict = spark.createDataFrame(user_list_dict)



In [23]:
user_list_dict_2 = [Row(*i.values()) for i in user_list_dict]

In [24]:
df_dict_2 = spark.createDataFrame(user_list_dict_2, 'user_id int, user_name string')

In [25]:
df_dict_2.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- user_name: string (nullable = true)



In [26]:
user_list_dict_3 = [Row(**i) for i in user_list_dict]

In [27]:
df_dict_3 = spark.createDataFrame(user_list_dict_2, 'user_id int, user_name string')

In [28]:
df_dict_3.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- user_name: string (nullable = true)



##### Overview basic data types

In [29]:
user = [
        {'id':1,
         'first_name':'Matias',
         'last_name':'Dibo',
         'email':'cvmdibo@email.com',
         'is_customer':True,
         'amount_paid':1000.0,
         'customer_from':datetime.datetime(2021,1,25),
         'last_updated_ts':datetime.datetime(2021,12,3)},
        
        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)}
]

In [30]:
users_df = spark.createDataFrame([Row(**i) for i in user])

In [31]:
users_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: timestamp (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



In [32]:
users_df.show()

+---+----------+---------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+-----------+-----------+-------------------+-------------------+



##### Specifying schema for Spark Dataframe using String

In [33]:
user_2 = [
        (1,
         'Matias',
         'Dibo',
         'cvmdibo@email.com',
         True,
         1000.0,
         datetime.datetime(2021,1,25),
         datetime.datetime(2021,12,3)) ,
        
        (2,
         'Juan',
         'Estevanez',
         'cvjestevanez@email.com',
         True,
         152.5,
         datetime.datetime(2020,6,12),
         datetime.datetime(2022,1,4)) ,
        
        (3,
         'Alejandra',
         'Funes',
         'cvafunes@email.com',
         False,
         None,
         None,
         datetime.datetime(2022,3,31))
]

In [34]:
user_2_schema = '''
  id INT,
  first_name STRING,
  last_name STRING,
  email STRING,
  is_customer BOOLEAN,
  amount_paid DOUBLE,
  customer_from DATE,
  last_updated_ts DATE
'''

In [35]:
users_df_2 = spark.createDataFrame(user_2, schema = user_2_schema)

users_df_2

DataFrame[id: int, first_name: string, last_name: string, email: string, is_customer: boolean, amount_paid: double, customer_from: date, last_updated_ts: date]

In [36]:
users_df_2.show(truncate=False)

+---+----------+---------+----------------------+-----------+-----------+-------------+---------------+
|id |first_name|last_name|email                 |is_customer|amount_paid|customer_from|last_updated_ts|
+---+----------+---------+----------------------+-----------+-----------+-------------+---------------+
|1  |Matias    |Dibo     |cvmdibo@email.com     |true       |1000.0     |2021-01-25   |2021-12-03     |
|2  |Juan      |Estevanez|cvjestevanez@email.com|true       |152.5      |2020-06-12   |2022-01-04     |
|3  |Alejandra |Funes    |cvafunes@email.com    |false      |null       |null         |2022-03-31     |
+---+----------+---------+----------------------+-----------+-----------+-------------+---------------+



##### Specifying schema using Spark Types

In [37]:
user_3_schema = StructType([
                           StructField('id', IntegerType()),
                           StructField('first_name', StringType()),
                           StructField('last_name', StringType()),
                           StructField('email', StringType()),
                           StructField('is_customer', BooleanType()),
                           StructField('amount_paid', DoubleType()),
                           StructField('customer_from', DateType()),
                           StructField('last_updated_ts', TimestampType())
])

In [38]:
users_df_3 = spark.createDataFrame(user_2, schema = user_3_schema)

users_df_3

DataFrame[id: int, first_name: string, last_name: string, email: string, is_customer: boolean, amount_paid: double, customer_from: date, last_updated_ts: timestamp]

In [39]:
users_df_3.show(truncate=False)

+---+----------+---------+----------------------+-----------+-----------+-------------+-------------------+
|id |first_name|last_name|email                 |is_customer|amount_paid|customer_from|last_updated_ts    |
+---+----------+---------+----------------------+-----------+-----------+-------------+-------------------+
|1  |Matias    |Dibo     |cvmdibo@email.com     |true       |1000.0     |2021-01-25   |2021-12-03 00:00:00|
|2  |Juan      |Estevanez|cvjestevanez@email.com|true       |152.5      |2020-06-12   |2022-01-04 00:00:00|
|3  |Alejandra |Funes    |cvafunes@email.com    |false      |null       |null         |2022-03-31 00:00:00|
+---+----------+---------+----------------------+-----------+-----------+-------------+-------------------+



##### Create Spark dataframe using Pandas

In [40]:
df_user = pd.DataFrame(user)

df_user

Unnamed: 0,id,first_name,last_name,email,is_customer,amount_paid,customer_from,last_updated_ts
0,1,Matias,Dibo,cvmdibo@email.com,True,1000.0,2021-01-25,2021-12-03
1,2,Juan,Estevanez,cvjestevanez@email.com,True,152.5,2020-06-12,2022-01-04


In [41]:
spark.createDataFrame(df_user).show()

+---+----------+---------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+-----------+-----------+-------------------+-------------------+



In [42]:
spark.createDataFrame(df_user).printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: timestamp (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



##### Array type columns in Spark dataframe

In [43]:
user_3 = [
        {'id':1,
         'first_name':'Matias',
         'last_name':'Dibo',
         'email':'cvmdibo@email.com',
         'phone_numbers':['+5493512500000'],
         'is_customer':True,
         'amount_paid':1000.0,
         'customer_from':datetime.datetime(2021,1,25),
         'last_updated_ts':datetime.datetime(2021,12,3)},
        
        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'phone_numbers':['+5493512999999', '+548692214553'],
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)}
]

In [44]:
users_df_4 = spark.createDataFrame([Row(**i) for i in user_3])

In [45]:
users_df_4.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_numbers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: timestamp (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



In [46]:
users_df_4.select('id','phone_numbers').show(truncate=False)

+---+-------------------------------+
|id |phone_numbers                  |
+---+-------------------------------+
|1  |[+5493512500000]               |
|2  |[+5493512999999, +548692214553]|
+---+-------------------------------+



In [47]:
users_df_4.withColumn('phone_number', explode('phone_numbers')).drop('phone_numbers').show(truncate=False)

+---+----------+---------+----------------------+-----------+-----------+-------------------+-------------------+--------------+
|id |first_name|last_name|email                 |is_customer|amount_paid|customer_from      |last_updated_ts    |phone_number  |
+---+----------+---------+----------------------+-----------+-----------+-------------------+-------------------+--------------+
|1  |Matias    |Dibo     |cvmdibo@email.com     |true       |1000.0     |2021-01-25 00:00:00|2021-12-03 00:00:00|+5493512500000|
|2  |Juan      |Estevanez|cvjestevanez@email.com|true       |152.5      |2020-06-12 00:00:00|2022-01-04 00:00:00|+5493512999999|
|2  |Juan      |Estevanez|cvjestevanez@email.com|true       |152.5      |2020-06-12 00:00:00|2022-01-04 00:00:00|+548692214553 |
+---+----------+---------+----------------------+-----------+-----------+-------------------+-------------------+--------------+



In [48]:
users_df_4.select('id', users_df_4.phone_numbers[0].alias('mobile'), users_df_4.phone_numbers[1].alias('home')).show()

+---+--------------+-------------+
| id|        mobile|         home|
+---+--------------+-------------+
|  1|+5493512500000|         null|
|  2|+5493512999999|+548692214553|
+---+--------------+-------------+



##### Map type columns in Spark dataframe

In [59]:
user_4 = [
        {'id':1,
         'first_name':'Matias',
         'last_name':'Dibo',
         'email':'cvmdibo@email.com',
         'phone_numbers':{'mobile':'+5493512500000'},
         'is_customer':True,
         'amount_paid':1000.0,
         'customer_from':datetime.datetime(2021,1,25),
         'last_updated_ts':datetime.datetime(2021,12,3)},
        
        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'phone_numbers':{'mobile':'+5493512999999', 'home':'+548692214553'},
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)}
]

In [60]:
users_df_5 = spark.createDataFrame([Row(**i) for i in user_4])

In [61]:
users_df_5.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_numbers: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: timestamp (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



In [62]:
users_df_5.select('id','phone_numbers').show(truncate=False)

+---+-------------------------------------------------+
|id |phone_numbers                                    |
+---+-------------------------------------------------+
|1  |[mobile -> +5493512500000]                       |
|2  |[mobile -> +5493512999999, home -> +548692214553]|
+---+-------------------------------------------------+



In [63]:
users_df_5.select('id', users_df_5.phone_numbers.mobile.alias('mobile')\
                  , users_df_5.phone_numbers.home.alias('home')).show(truncate=False)

+---+--------------+-------------+
|id |mobile        |home         |
+---+--------------+-------------+
|1  |+5493512500000|null         |
|2  |+5493512999999|+548692214553|
+---+--------------+-------------+



In [64]:
users_df_5.select('id', explode('phone_numbers'))\
  .withColumnRenamed('key','phone_type')\
  .withColumnRenamed('value','phone_number').show(truncate=False)

+---+----------+--------------+
|id |phone_type|phone_number  |
+---+----------+--------------+
|1  |mobile    |+5493512500000|
|2  |mobile    |+5493512999999|
|2  |home      |+548692214553 |
+---+----------+--------------+



##### Select on Spark dataframe

In [65]:
users_df_4.select('*').show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|    [+5493512500000]|       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[+5493512999999, ...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+



In [66]:
users_df_4.select('id', 'first_name', 'last_name').show()

+---+----------+---------+
| id|first_name|last_name|
+---+----------+---------+
|  1|    Matias|     Dibo|
|  2|      Juan|Estevanez|
+---+----------+---------+



In [67]:
users_df_4.alias('copy').select('copy.first_name','copy.last_name').show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Matias|     Dibo|
|      Juan|Estevanez|
+----------+---------+



In [68]:
# Copy of Spark dataframe
copy_df_4 = users_df_4.alias('users_df_4_copy')

copy_df_4.show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|    [+5493512500000]|       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[+5493512999999, ...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+



In [69]:
users_df_4.select('id',col('first_name'),col('last_name')).show()

+---+----------+---------+
| id|first_name|last_name|
+---+----------+---------+
|  1|    Matias|     Dibo|
|  2|      Juan|Estevanez|
+---+----------+---------+



In [70]:
users_df_4.select('id', col('first_name'),\
                  'last_name',\
                  concat('first_name', lit(' '),'last_name').alias('full_name'))\
                  .show()

+---+----------+---------+--------------+
| id|first_name|last_name|     full_name|
+---+----------+---------+--------------+
|  1|    Matias|     Dibo|   Matias Dibo|
|  2|      Juan|Estevanez|Juan Estevanez|
+---+----------+---------+--------------+



##### SelectExpr on Spark dataframe

In [71]:
user_6 = [
        {'id':1,
         'first_name':'Matias',
         'last_name':'Dibo',
         'email':'cvmdibo@email.com',
         'phone_numbers':['+5493512500000'],
         'courses':[1,6],
         'is_customer':True,
         'amount_paid':1000.0,
         'customer_from':datetime.datetime(2021,1,25),
         'last_updated_ts':datetime.datetime(2021,12,3)},
        
        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'phone_numbers':['+5493512999999', '+548692214553'],
         'courses':[3],
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)}
]

In [72]:
users_df_7 = spark.createDataFrame([Row(**i) for i in user_6])

In [73]:
users_df_7.show(truncate=False)

+---+----------+---------+----------------------+-------------------------------+-------+-----------+-----------+-------------------+-------------------+
|id |first_name|last_name|email                 |phone_numbers                  |courses|is_customer|amount_paid|customer_from      |last_updated_ts    |
+---+----------+---------+----------------------+-------------------------------+-------+-----------+-----------+-------------------+-------------------+
|1  |Matias    |Dibo     |cvmdibo@email.com     |[+5493512500000]               |[1, 6] |true       |1000.0     |2021-01-25 00:00:00|2021-12-03 00:00:00|
|2  |Juan      |Estevanez|cvjestevanez@email.com|[+5493512999999, +548692214553]|[3]    |true       |152.5      |2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+----------------------+-------------------------------+-------+-----------+-----------+-------------------+-------------------+



In [74]:
temp_df_1 = users_df_7.select('id',concat('first_name',lit(' '),'last_name').alias('full_name'),explode('courses').alias('courses'))

temp_df_1.show()

+---+--------------+-------+
| id|     full_name|courses|
+---+--------------+-------+
|  1|   Matias Dibo|      1|
|  1|   Matias Dibo|      6|
|  2|Juan Estevanez|      3|
+---+--------------+-------+



In [75]:
temp_df_1.selectExpr('id', 'courses * 2').show()

+---+-------------+
| id|(courses * 2)|
+---+-------------+
|  1|            2|
|  1|           12|
|  2|            6|
+---+-------------+



In [76]:
# create local temporary view
users_df_7.createOrReplaceTempView('users_6_table')

In [77]:
spark.sql('''
        SELECT id, concat(first_name, ' ', 'last_name') AS full_name, courses
        FROM users_6_table
''').show()

+---+----------------+-------+
| id|       full_name|courses|
+---+----------------+-------+
|  1|Matias last_name| [1, 6]|
|  2|  Juan last_name|    [3]|
+---+----------------+-------+



##### Referring columns using Spark dataframe names

In [None]:
users_df_7.alias('u').select('u.id', 'last_name').show()

+---+---------+
| id|last_name|
+---+---------+
|  1|     Dibo|
|  2|Estevanez|
+---+---------+



In [None]:
# selectExpr only works with string type
users_df_7.selectExpr('id', 'first_name', 'last_name',\
                      "concat(first_name,' ',last_name) AS full_name").show()

+---+----------+---------+--------------+
| id|first_name|last_name|     full_name|
+---+----------+---------+--------------+
|  1|    Matias|     Dibo|   Matias Dibo|
|  2|      Juan|Estevanez|Juan Estevanez|
+---+----------+---------+--------------+



In [None]:
users_df_7.createOrReplaceTempView('temp_view_df_7')

In [None]:
spark.sql('''
        SELECT u.id, u.first_name, u.last_name
        FROM temp_view_df_7 u
''').show()

+---+----------+---------+
| id|first_name|last_name|
+---+----------+---------+
|  1|    Matias|     Dibo|
|  2|      Juan|Estevanez|
+---+----------+---------+



##### Col function in Spark

In [None]:
cols = ['id', 'first_name', 'last_name']

users_df_7.select(cols).show()

+---+----------+---------+
| id|first_name|last_name|
+---+----------+---------+
|  1|    Matias|     Dibo|
|  2|      Juan|Estevanez|
+---+----------+---------+



In [None]:
col_id = col('id')

users_df_7.select(col_id).show()

+---+
| id|
+---+
|  1|
|  2|
+---+



In [None]:
users_df_7.select('id', 'customer_from').show()

+---+-------------------+
| id|      customer_from|
+---+-------------------+
|  1|2021-01-25 00:00:00|
|  2|2020-06-12 00:00:00|
+---+-------------------+



In [None]:
users_df_7.select('id', 'customer_from').dtypes

[('id', 'bigint'), ('customer_from', 'timestamp')]

In [None]:
users_df_7.select('id', date_format('customer_from', 'yyyyMMdd').alias('date')).show()

+---+--------+
| id|    date|
+---+--------+
|  1|20210125|
|  2|20200612|
+---+--------+



In [None]:
users_df_7.select('id', date_format('customer_from', 'yyyyMMdd')\
                  .alias('date')).dtypes

[('id', 'bigint'), ('date', 'string')]

In [None]:
users_df_7.select('id', date_format('customer_from', 'yyyyMMdd')\
                  .alias('date').cast('int')).dtypes

[('id', 'bigint'), ('date', 'int')]

In [None]:
full_name_col = concat('first_name',lit(' '),'last_name').alias('full_name')

users_df_7.select('id', full_name_col).show()

+---+--------------+
| id|     full_name|
+---+--------------+
|  1|   Matias Dibo|
|  2|Juan Estevanez|
+---+--------------+



In [None]:
customer_from_col = date_format('customer_from','yyyyMMdd')\
                    .alias('customer_from').cast('int')

users_df_7.select('id', customer_from_col).show()

+---+-------------+
| id|customer_from|
+---+-------------+
|  1|     20210125|
|  2|     20200612|
+---+-------------+



In [None]:
spark.sql('''
      SELECT id, amount_paid, (amount_paid + 10) amount_paid_plus_10
      FROM temp_view_df_7
''').show()

+---+-----------+-------------------+
| id|amount_paid|amount_paid_plus_10|
+---+-----------+-------------------+
|  1|     1000.0|             1010.0|
|  2|      152.5|              162.5|
+---+-----------+-------------------+



In [None]:
users_df_7.select('id','amount_paid', (col('amount_paid')+10)\
                  .alias('amount_paid_plus_10')).show()

+---+-----------+-------------------+
| id|amount_paid|amount_paid_plus_10|
+---+-----------+-------------------+
|  1|     1000.0|             1010.0|
|  2|      152.5|              162.5|
+---+-----------+-------------------+



##### Naming derived columns using withColumn

In [None]:
users_df_7.select('id', 'first_name', 'last_name').\
          withColumn('full_name', concat('first_name',lit(' '),'last_name')).show()

+---+----------+---------+--------------+
| id|first_name|last_name|     full_name|
+---+----------+---------+--------------+
|  1|    Matias|     Dibo|   Matias Dibo|
|  2|      Juan|Estevanez|Juan Estevanez|
+---+----------+---------+--------------+



In [None]:
# replace a column on Spark dataframe with derived column
users_df_7.select('id', 'first_name', 'last_name').\
          withColumn('first_name', concat('first_name',lit(' '),'last_name')).show()

+---+--------------+---------+
| id|    first_name|last_name|
+---+--------------+---------+
|  1|   Matias Dibo|     Dibo|
|  2|Juan Estevanez|Estevanez|
+---+--------------+---------+



In [None]:
users_df_7.select('id', 'courses').\
          withColumn('course_count', size('courses')).show()

+---+-------+------------+
| id|courses|course_count|
+---+-------+------------+
|  1| [1, 6]|           2|
|  2|    [3]|           1|
+---+-------+------------+



##### Naming derived columns using withColumnRenamed or alias

In [None]:
users_df_7.select('id', 'first_name', 'last_name').\
          withColumnRenamed('first_name', 'fn').\
          withColumnRenamed('last_name', 'ln').\
          show()

+---+------+---------+
| id|    fn|       ln|
+---+------+---------+
|  1|Matias|     Dibo|
|  2|  Juan|Estevanez|
+---+------+---------+



In [None]:
users_df_7.select('id', col('first_name').alias('fn')\
                  ,col('last_name').alias('ln')).show()

+---+------+---------+
| id|    fn|       ln|
+---+------+---------+
|  1|Matias|     Dibo|
|  2|  Juan|Estevanez|
+---+------+---------+



##### Renaming and reordering multiple Spark dataframes

In [None]:
required_cols = ['id','first_name','last_name','email','phone_numbers']

target_columns = ['user_id','user_first_name','user_last_name','user_mail','user_phone_numbers']

users_df_7.select(required_cols)\
          .toDF(*target_columns)\
          .show(truncate=False)

+-------+---------------+--------------+----------------------+-------------------------------+
|user_id|user_first_name|user_last_name|user_mail             |user_phone_numbers             |
+-------+---------------+--------------+----------------------+-------------------------------+
|1      |Matias         |Dibo          |cvmdibo@email.com     |[+5493512500000]               |
|2      |Juan           |Estevanez     |cvjestevanez@email.com|[+5493512999999, +548692214553]|
+-------+---------------+--------------+----------------------+-------------------------------+



#### Manipulating columns in Spark dataframes

##### Pre-defined functions

In [None]:
order_shema = StructType(). \
                add('id', IntegerType()). \
                add('order_date', DateType()). \
                add('oder_customer_id', IntegerType()). \
                add('order_status', StringType())

In [None]:
orders = spark.read.csv('/content/drive/MyDrive/retail_db-master/csv_files/orders', schema=order_shema)

In [None]:
orders.printSchema()

root
 |-- id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- oder_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [None]:
orders.show(5)

+---+----------+----------------+---------------+
| id|order_date|oder_customer_id|   order_status|
+---+----------+----------------+---------------+
|  1|2013-07-25|           11599|         CLOSED|
|  2|2013-07-25|             256|PENDING_PAYMENT|
|  3|2013-07-25|           12111|       COMPLETE|
|  4|2013-07-25|            8827|         CLOSED|
|  5|2013-07-25|           11318|       COMPLETE|
+---+----------+----------------+---------------+
only showing top 5 rows



In [None]:
orders.select('*', date_format('order_date', 'yyyyMM').alias('order_month')).show(5)

+---+----------+----------------+---------------+-----------+
| id|order_date|oder_customer_id|   order_status|order_month|
+---+----------+----------------+---------------+-----------+
|  1|2013-07-25|           11599|         CLOSED|     201307|
|  2|2013-07-25|             256|PENDING_PAYMENT|     201307|
|  3|2013-07-25|           12111|       COMPLETE|     201307|
|  4|2013-07-25|            8827|         CLOSED|     201307|
|  5|2013-07-25|           11318|       COMPLETE|     201307|
+---+----------+----------------+---------------+-----------+
only showing top 5 rows



In [None]:
orders.withColumn('order_month', date_format('order_date', 'yyyyMM')).show(5)

+---+----------+----------------+---------------+-----------+
| id|order_date|oder_customer_id|   order_status|order_month|
+---+----------+----------------+---------------+-----------+
|  1|2013-07-25|           11599|         CLOSED|     201307|
|  2|2013-07-25|             256|PENDING_PAYMENT|     201307|
|  3|2013-07-25|           12111|       COMPLETE|     201307|
|  4|2013-07-25|            8827|         CLOSED|     201307|
|  5|2013-07-25|           11318|       COMPLETE|     201307|
+---+----------+----------------+---------------+-----------+
only showing top 5 rows



In [None]:
# filter data with order_date in 2014-01
orders.filter(date_format('order_date','yyyyMM') == 201401).show(5)

+-----+----------+----------------+---------------+
|   id|order_date|oder_customer_id|   order_status|
+-----+----------+----------------+---------------+
|25876|2014-01-01|            3414|PENDING_PAYMENT|
|25877|2014-01-01|            5549|PENDING_PAYMENT|
|25878|2014-01-01|            9084|        PENDING|
|25879|2014-01-01|            5118|        PENDING|
|25880|2014-01-01|           10146|       CANCELED|
+-----+----------+----------------+---------------+
only showing top 5 rows



In [None]:
orders.groupBy(date_format('order_date','yyyyMM').alias('order_month')). \
               count().sort(col('order_month').desc()).show(5)

+-----------+-----+
|order_month|count|
+-----------+-----+
|     201407| 4468|
|     201406| 5308|
|     201405| 5467|
|     201404| 5657|
|     201403| 5778|
+-----------+-----+
only showing top 5 rows



##### Create dummy dataframe

In [None]:
orders.select(current_date().alias('current_date')).show(5)

+------------+
|current_date|
+------------+
|  2022-04-19|
|  2022-04-19|
|  2022-04-19|
|  2022-04-19|
|  2022-04-19|
+------------+
only showing top 5 rows



In [None]:
employees = [(1, "Steve", "Ramirez", 1000.0, "USA", "+549 35199999554", "35199999554"),
             (2, "John", "Smith", 1500.0, "England", "+548 35199456987", "35199456987"),
             (3, "Nick", "Warren", 750.0, "Australia", "+547 35199456666", "35199456666"),
             (4, "Bill", "Clinton", 2000.0, "USA", "+549 351994784666", "35199478466")]

dum_schema = StructType(). \
              add('id', IntegerType()). \
              add('first_name', StringType()). \
              add('last_name', StringType()). \
              add('salary', DoubleType()).add('country', StringType()). \
              add('phone_number', StringType()). \
              add('SSN', StringType())

In [None]:
dummy_df = spark.createDataFrame(employees, schema=dum_schema)

In [None]:
dummy_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- country: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- SSN: string (nullable = true)



In [None]:
dummy_df.show(truncate=False)

+---+----------+---------+------+---------+-----------------+-----------+
|id |first_name|last_name|salary|country  |phone_number     |SSN        |
+---+----------+---------+------+---------+-----------------+-----------+
|1  |Steve     |Ramirez  |1000.0|USA      |+549 35199999554 |35199999554|
|2  |John      |Smith    |1500.0|England  |+548 35199456987 |35199456987|
|3  |Nick      |Warren   |750.0 |Australia|+547 35199456666 |35199456666|
|4  |Bill      |Clinton  |2000.0|USA      |+549 351994784666|35199478466|
+---+----------+---------+------+---------+-----------------+-----------+



In [None]:
# if we want to apply a transformation using some of the functions, then passing 
# column names as STRING won't be suffice. We have to pass them as column type

dummy_df.select(upper(col('first_name')).alias('first_name')
              , upper(col('last_name')).alias('last_name')).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     STEVE|  RAMIREZ|
|      JOHN|    SMITH|
|      NICK|   WARREN|
|      BILL|  CLINTON|
+----------+---------+



In [None]:
dummy_df.orderBy(col('last_name').desc()).show()

+---+----------+---------+------+---------+-----------------+-----------+
| id|first_name|last_name|salary|  country|     phone_number|        SSN|
+---+----------+---------+------+---------+-----------------+-----------+
|  3|      Nick|   Warren| 750.0|Australia| +547 35199456666|35199456666|
|  2|      John|    Smith|1500.0|  England| +548 35199456987|35199456987|
|  1|     Steve|  Ramirez|1000.0|      USA| +549 35199999554|35199999554|
|  4|      Bill|  Clinton|2000.0|      USA|+549 351994784666|35199478466|
+---+----------+---------+------+---------+-----------------+-----------+



In [None]:
dummy_df.orderBy(dummy_df['last_name'].desc()).show()

+---+----------+---------+------+---------+-----------------+-----------+
| id|first_name|last_name|salary|  country|     phone_number|        SSN|
+---+----------+---------+------+---------+-----------------+-----------+
|  3|      Nick|   Warren| 750.0|Australia| +547 35199456666|35199456666|
|  2|      John|    Smith|1500.0|  England| +548 35199456987|35199456987|
|  1|     Steve|  Ramirez|1000.0|      USA| +549 35199999554|35199999554|
|  4|      Bill|  Clinton|2000.0|      USA|+549 351994784666|35199478466|
+---+----------+---------+------+---------+-----------------+-----------+



##### String manipulation

In [None]:
dummy_df.select(concat('first_name',lit(' '),'last_name').alias('full_name')).show()

+-------------+
|    full_name|
+-------------+
|Steve Ramirez|
|   John Smith|
|  Nick Warren|
| Bill Clinton|
+-------------+



In [None]:
dummy_df.select('id', 'country'). \
        withColumn('country_upper', upper(col('country'))). \
        withColumn('country_lower', lower(col('country'))). \
        withColumn('country_init_cap', initcap(col('country'))). \
        withColumn('country_length', length('country')). \
        show()

+---+---------+-------------+-------------+----------------+--------------+
| id|  country|country_upper|country_lower|country_init_cap|country_length|
+---+---------+-------------+-------------+----------------+--------------+
|  1|      USA|          USA|          usa|             Usa|             3|
|  2|  England|      ENGLAND|      england|         England|             7|
|  3|Australia|    AUSTRALIA|    australia|       Australia|             9|
|  4|      USA|          USA|          usa|             Usa|             3|
+---+---------+-------------+-------------+----------------+--------------+



In [None]:
dummy_df.select(substring(lit('Hello World'), 1, 5)).show(1)

+----------------------------+
|substring(Hello World, 1, 5)|
+----------------------------+
|                       Hello|
+----------------------------+
only showing top 1 row



In [None]:
dummy_df.select(substring(lit('Hello World'), -5, 5)).show(1)

+-----------------------------+
|substring(Hello World, -5, 5)|
+-----------------------------+
|                        World|
+-----------------------------+
only showing top 1 row



In [None]:
dummy_df.select('id', concat('first_name', lit(' '), 'last_name').alias('full_name'), 'phone_number') \
        .withColumn('country_code', substring(col('phone_number'),2, 3).cast('int')) \
        .show()

+---+-------------+-----------------+------------+
| id|    full_name|     phone_number|country_code|
+---+-------------+-----------------+------------+
|  1|Steve Ramirez| +549 35199999554|         549|
|  2|   John Smith| +548 35199456987|         548|
|  3|  Nick Warren| +547 35199456666|         547|
|  4| Bill Clinton|+549 351994784666|         549|
+---+-------------+-----------------+------------+



##### Padding characters around strings

In [None]:
dummy_df.select(lpad(lit('Hello'), 10, '-').alias('dummy')).show(1)

+----------+
|     dummy|
+----------+
|-----Hello|
+----------+
only showing top 1 row



In [None]:
dummy_df.select(rpad(lit('Hello'), 10, '-').alias('dummy')).show(1)

+----------+
|     dummy|
+----------+
|Hello-----|
+----------+
only showing top 1 row



In [None]:
fixed_dummy_df = dummy_df.select(concat(
    lpad(dummy_df.id, 5, '0'),
    lit('--'), 
    rpad(dummy_df.first_name, 10, '-'),
    rpad(dummy_df.last_name, 10, '-'),
    lpad(dummy_df.salary, 10, '0'),
    lit('--'),
    rpad(dummy_df.country, 15, '-'),
    rpad(dummy_df.phone_number, 17, '-'),
    lit('--'),
    dummy_df.SSN
).alias('employee_full_record'))

fixed_dummy_df.show(truncate=False)

+------------------------------------------------------------------------------------+
|employee_full_record                                                                |
+------------------------------------------------------------------------------------+
|00001--Steve-----Ramirez---00001000.0--USA------------+549 35199999554---35199999554|
|00002--John------Smith-----00001500.0--England--------+548 35199456987---35199456987|
|00003--Nick------Warren----00000750.0--Australia------+547 35199456666---35199456666|
|00004--Bill------Clinton---00002000.0--USA------------+549 351994784666--35199478466|
+------------------------------------------------------------------------------------+



##### Trimming characters from strings


In [None]:
s = [('   Hello.    ',)]

s_df = spark.createDataFrame(s, schema='''dummy STRING''')

In [None]:
s_df.withColumn('ltrim', ltrim(s_df.dummy)) \
    .withColumn('rtrim', rtrim(s_df.dummy)) \
    .withColumn('trim', trim(s_df.dummy)) \
    .show()

+-------------+----------+---------+------+
|        dummy|     ltrim|    rtrim|  trim|
+-------------+----------+---------+------+
|   Hello.    |Hello.    |   Hello.|Hello.|
+-------------+----------+---------+------+



In [None]:
s_df.withColumn('ltrim', ltrim(s_df.dummy)) \
    .withColumn('rtrim', rtrim(s_df.dummy)) \
    .withColumn('trim', translate(trim(s_df.dummy), '.', '')) \
    .show()

+-------------+----------+---------+-----+
|        dummy|     ltrim|    rtrim| trim|
+-------------+----------+---------+-----+
|   Hello.    |Hello.    |   Hello.|Hello|
+-------------+----------+---------+-----+



In [None]:
s_df.withColumn('ltrim', expr("TRIM(LEADING ' ' FROM dummy)")) \
    .withColumn('rtrim', expr("TRIM(TRAILING ' ' FROM dummy)")) \
    .withColumn('trim', expr("TRIM(BOTH '.' FROM rtrim(dummy))")) \
    .show()

+-------------+----------+---------+--------+
|        dummy|     ltrim|    rtrim|    trim|
+-------------+----------+---------+--------+
|   Hello.    |Hello.    |   Hello.|   Hello|
+-------------+----------+---------+--------+



##### Date and time manipulation

In [None]:
s_df.select(current_date().alias('date')).show()

+----------+
|      date|
+----------+
|2022-04-19|
+----------+



In [None]:
s_df.select(current_timestamp().alias('timestamp')).show(truncate=False)

+-----------------------+
|timestamp              |
+-----------------------+
|2022-04-19 09:11:22.371|
+-----------------------+



In [None]:
s_df.select(to_date(lit('20220331'), 'yyyyMMdd').alias('date')).show()

+----------+
|      date|
+----------+
|2022-03-31|
+----------+



In [None]:
s_df.select(to_timestamp(lit('20220331 2135'), 'yyyyMMdd HHmm')\
            .alias('date')).show()

+-------------------+
|               date|
+-------------------+
|2022-03-31 21:35:00|
+-------------------+



##### Date and time arithmetic

In [None]:
dates = [("2014-02-08", "2014-02-08 10:00:00.123"),
         ("2016-02-29", "2016-02-29 08:08:08.999"),
         ("2017-10-31", "2017-12-31 11:59:59.123"),
         ("2019-11-30", "2019-08-31 00:00:00.000")]

In [None]:
date_schema = StructType(). \
              add('date', StringType()). \
              add('time', StringType())

df_datetime = spark.createDataFrame(dates, schema=date_schema)

In [None]:
df_datetime.withColumn('date_add', date_add('date', 10)) \
            .withColumn('time_add', date_add('time', 10)) \
            .withColumn('date_sub', date_sub('date', 10)) \
            .withColumn('time_sub', date_sub('time', 10)) \
            .show(truncate=False)

+----------+-----------------------+----------+----------+----------+----------+
|date      |time                   |date_add  |time_add  |date_sub  |time_sub  |
+----------+-----------------------+----------+----------+----------+----------+
|2014-02-08|2014-02-08 10:00:00.123|2014-02-18|2014-02-18|2014-01-29|2014-01-29|
|2016-02-29|2016-02-29 08:08:08.999|2016-03-10|2016-03-10|2016-02-19|2016-02-19|
|2017-10-31|2017-12-31 11:59:59.123|2017-11-10|2018-01-10|2017-10-21|2017-12-21|
|2019-11-30|2019-08-31 00:00:00.000|2019-12-10|2019-09-10|2019-11-20|2019-08-21|
+----------+-----------------------+----------+----------+----------+----------+



In [None]:
df_datetime.withColumn('datediff_date', datediff(current_date(), 'date')) \
            .withColumn('datediff_time', datediff(current_timestamp(), 'time')) \
            .show(truncate=False)

+----------+-----------------------+-------------+-------------+
|date      |time                   |datediff_date|datediff_time|
+----------+-----------------------+-------------+-------------+
|2014-02-08|2014-02-08 10:00:00.123|2992         |2992         |
|2016-02-29|2016-02-29 08:08:08.999|2241         |2241         |
|2017-10-31|2017-12-31 11:59:59.123|1631         |1570         |
|2019-11-30|2019-08-31 00:00:00.000|871          |962          |
+----------+-----------------------+-------------+-------------+



In [None]:
df_datetime.withColumn('month_between_date', round(months_between(current_date(), 'date'), 2)) \
            .withColumn('month_between_time', round(months_between(current_timestamp(), 'time'), 2)) \
            .withColumn('add_month_date', add_months('date', 3)) \
            .withColumn('add_month_time', add_months('time', 3)) \
            .show(truncate=False)

+----------+-----------------------+------------------+------------------+--------------+--------------+
|date      |time                   |month_between_date|month_between_time|add_month_date|add_month_time|
+----------+-----------------------+------------------+------------------+--------------+--------------+
|2014-02-08|2014-02-08 10:00:00.123|98.35             |98.35             |2014-05-08    |2014-05-08    |
|2016-02-29|2016-02-29 08:08:08.999|73.68             |73.68             |2016-05-29    |2016-05-29    |
|2017-10-31|2017-12-31 11:59:59.123|53.61             |51.61             |2018-01-31    |2018-03-31    |
|2019-11-30|2019-08-31 00:00:00.000|28.65             |31.63             |2020-02-29    |2019-11-30    |
+----------+-----------------------+------------------+------------------+--------------+--------------+



##### Date and time trunc functions

In [None]:
df_datetime.withColumn('date_trunc', trunc('date', 'mm')) \
            .withColumn('time_trunc', trunc('time', 'YY')) \
            .show(truncate=False)

+----------+-----------------------+----------+----------+
|date      |time                   |date_trunc|time_trunc|
+----------+-----------------------+----------+----------+
|2014-02-08|2014-02-08 10:00:00.123|2014-02-01|2014-01-01|
|2016-02-29|2016-02-29 08:08:08.999|2016-02-01|2016-01-01|
|2017-10-31|2017-12-31 11:59:59.123|2017-10-01|2017-01-01|
|2019-11-30|2019-08-31 00:00:00.000|2019-11-01|2019-01-01|
+----------+-----------------------+----------+----------+



In [None]:
df_datetime.withColumn('date_dt', date_trunc('MM', 'date')) \
            .withColumn('time_dt', date_trunc('YY', 'time')) \
            .show(truncate=False)

+----------+-----------------------+-------------------+-------------------+
|date      |time                   |date_dt            |time_dt            |
+----------+-----------------------+-------------------+-------------------+
|2014-02-08|2014-02-08 10:00:00.123|2014-02-01 00:00:00|2014-01-01 00:00:00|
|2016-02-29|2016-02-29 08:08:08.999|2016-02-01 00:00:00|2016-01-01 00:00:00|
|2017-10-31|2017-12-31 11:59:59.123|2017-10-01 00:00:00|2017-01-01 00:00:00|
|2019-11-30|2019-08-31 00:00:00.000|2019-11-01 00:00:00|2019-01-01 00:00:00|
+----------+-----------------------+-------------------+-------------------+



In [None]:
df_datetime.withColumn('date_dt', date_trunc('HOUR', 'date')) \
            .withColumn('time_dt', date_trunc('HOUR', 'time')) \
            .withColumn('time_dt_d', date_trunc('dd', 'time')) \
            .show(truncate=False)

+----------+-----------------------+-------------------+-------------------+-------------------+
|date      |time                   |date_dt            |time_dt            |time_dt_d          |
+----------+-----------------------+-------------------+-------------------+-------------------+
|2014-02-08|2014-02-08 10:00:00.123|2014-02-08 00:00:00|2014-02-08 10:00:00|2014-02-08 00:00:00|
|2016-02-29|2016-02-29 08:08:08.999|2016-02-29 00:00:00|2016-02-29 08:00:00|2016-02-29 00:00:00|
|2017-10-31|2017-12-31 11:59:59.123|2017-10-31 00:00:00|2017-12-31 11:00:00|2017-12-31 00:00:00|
|2019-11-30|2019-08-31 00:00:00.000|2019-11-30 00:00:00|2019-08-31 00:00:00|2019-08-31 00:00:00|
+----------+-----------------------+-------------------+-------------------+-------------------+



##### Date and time extract functions

In [None]:
s_df.select(
    current_date().alias('current_date'),
    year(current_date()).alias('current_year'),
    month(current_date()).alias('current_month'),
    weekofyear(current_date()).alias('weekofyear'),
    dayofyear(current_date()).alias('dayofyear'),
    dayofmonth(current_date()).alias('dayofmonth'),
    dayofweek(current_date()).alias('dayofweek')
).show(truncate=False)

+------------+------------+-------------+----------+---------+----------+---------+
|current_date|current_year|current_month|weekofyear|dayofyear|dayofmonth|dayofweek|
+------------+------------+-------------+----------+---------+----------+---------+
|2022-04-19  |2022        |4            |16        |109      |19        |3        |
+------------+------------+-------------+----------+---------+----------+---------+



In [None]:
s_df.select(
    current_timestamp().alias('current_date'),
    year(current_timestamp()).alias('current_year'),
    month(current_timestamp()).alias('current_month'),
    weekofyear(current_timestamp()).alias('weekofyear'),
    dayofyear(current_timestamp()).alias('dayofyear'),
    dayofmonth(current_timestamp()).alias('dayofmonth'),
    dayofweek(current_timestamp()).alias('dayofweek'),
    hour(current_timestamp()).alias('hour'),
    minute(current_timestamp()).alias('minute'),
    second(current_timestamp()).alias('second')
).show(truncate=False)

+-----------------------+------------+-------------+----------+---------+----------+---------+----+------+------+
|current_date           |current_year|current_month|weekofyear|dayofyear|dayofmonth|dayofweek|hour|minute|second|
+-----------------------+------------+-------------+----------+---------+----------+---------+----+------+------+
|2022-04-19 09:11:43.152|2022        |4            |16        |109      |19        |3        |9   |11    |43    |
+-----------------------+------------+-------------+----------+---------+----------+---------+----+------+------+



##### to_date and to_timestamp functions

In [None]:
datetimes_2 = [(20140228, '28-Feb-2014 10:00:00.123'),
               (20160229, '20-Feb-2016 08:08:08.999'),
               (20171031, '31-Dec-2017 11:59:59.123'),
               (20191130, '31-Aug-2019 00:00:00.000')]

In [None]:
df_datetime_2 = spark.createDataFrame(datetimes_2, schema='''
                                                          date INT,
                                                          time STRING
                                                          ''')

In [None]:
df_datetime_2.select(to_date(lit('20220409'), 'yyyyMMdd').alias('to_date')).show(1)

+----------+
|   to_date|
+----------+
|2022-04-09|
+----------+
only showing top 1 row



In [None]:
df_datetime_2.select(to_date(lit('2022/04/09'), 'yyyy/MM/dd').alias('to_date')).show(1)

+----------+
|   to_date|
+----------+
|2022-04-09|
+----------+
only showing top 1 row



In [None]:
df_datetime_2.select(to_date(lit('2022-Apr-09'), 'yyyy-MMM-dd').alias('to_date')).show(1)

+----------+
|   to_date|
+----------+
|2022-04-09|
+----------+
only showing top 1 row



In [None]:
df_datetime_2.select(to_date(lit('2022-April-09'), 'yyyy-MMMM-dd').alias('to_date')).show(1)

+----------+
|   to_date|
+----------+
|2022-04-09|
+----------+
only showing top 1 row



In [None]:
df_datetime_2.select(to_date(lit('April 09, 2022'), 'MMMM dd, yyyy').alias('to_date')).show(1)

+----------+
|   to_date|
+----------+
|2022-04-09|
+----------+
only showing top 1 row



In [None]:
df_datetime_2.show(truncate=False)

+--------+------------------------+
|date    |time                    |
+--------+------------------------+
|20140228|28-Feb-2014 10:00:00.123|
|20160229|20-Feb-2016 08:08:08.999|
|20171031|31-Dec-2017 11:59:59.123|
|20191130|31-Aug-2019 00:00:00.000|
+--------+------------------------+



In [None]:
df_datetime_2.select(to_date(col('date').cast('string'), 'yyyyMMdd').alias('date'),
                     to_timestamp('time', 'dd-MMM-yyyy HH:mm:ss.SSS').alias('date')) \
                     .show(truncate=False)

+----------+-----------------------+
|date      |date                   |
+----------+-----------------------+
|2014-02-28|2014-02-28 10:00:00.123|
|2016-02-29|2016-02-20 08:08:08.999|
|2017-10-31|2017-12-31 11:59:59.123|
|2019-11-30|2019-08-31 00:00:00    |
+----------+-----------------------+



##### Dealing with nulls

In [None]:
employees_na = [(1, "Steve", "Ramirez", 1000.0, "USA", '15', "+549 35199999554", "35199999554"),
             (2, "John", "Smith", 1500.0, "England", None, "+548 35199456987", "35199456987"),
             (3, "Nick", "Warren", 750.0, "Australia", '', "+547 35199456666", "35199456666"),
             (4, "Bill", "Clinton", 2000.0, "USA", None, "+549 351994784666", "35199478466")]

dum_schema_na = StructType(). \
              add('id', IntegerType()). \
              add('first_name', StringType()). \
              add('last_name', StringType()). \
              add('salary', DoubleType()). \
              add('country', StringType()). \
              add('bonus', StringType()). \
              add('phone_number', StringType()). \
              add('SSN', StringType())

In [None]:
dummy_na_df = spark.createDataFrame(employees_na, schema=dum_schema_na)

In [None]:
dummy_na_df.show(truncate=False)

+---+----------+---------+------+---------+-----+-----------------+-----------+
|id |first_name|last_name|salary|country  |bonus|phone_number     |SSN        |
+---+----------+---------+------+---------+-----+-----------------+-----------+
|1  |Steve     |Ramirez  |1000.0|USA      |15   |+549 35199999554 |35199999554|
|2  |John      |Smith    |1500.0|England  |null |+548 35199456987 |35199456987|
|3  |Nick      |Warren   |750.0 |Australia|     |+547 35199456666 |35199456666|
|4  |Bill      |Clinton  |2000.0|USA      |null |+549 351994784666|35199478466|
+---+----------+---------+------+---------+-----+-----------------+-----------+



In [None]:
# Does not return the empty string with the desired value
dummy_na_df.withColumn('bonus1', coalesce('bonus', lit('0'))).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|bonus1|
+---+----------+---------+------+---------+-----+-----------------+-----------+------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|    15|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987|     0|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|      |
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466|     0|
+---+----------+---------+------+---------+-----+-----------------+-----------+------+



In [None]:
dummy_na_df.withColumn('bonus1', col('bonus').cast('int')).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|bonus1|
+---+----------+---------+------+---------+-----+-----------------+-----------+------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|    15|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987|  null|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|  null|
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466|  null|
+---+----------+---------+------+---------+-----+-----------------+-----------+------+



In [None]:
dummy_na_df.withColumn('bonus1', coalesce(col('bonus').cast('int'), lit('0'))).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|bonus1|
+---+----------+---------+------+---------+-----+-----------------+-----------+------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|    15|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987|     0|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|     0|
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466|     0|
+---+----------+---------+------+---------+-----+-----------------+-----------+------+



In [None]:
dummy_na_df.withColumn('bonus', expr('nvl(bonus, 0)')).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|
+---+----------+---------+------+---------+-----+-----------------+-----------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|
|  2|      John|    Smith|1500.0|  England|    0| +548 35199456987|35199456987|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|
|  4|      Bill|  Clinton|2000.0|      USA|    0|+549 351994784666|35199478466|
+---+----------+---------+------+---------+-----+-----------------+-----------+



In [None]:
dummy_na_df.withColumn('bonus', expr("nvl(nullif(bonus, ''), 0)")).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|
+---+----------+---------+------+---------+-----+-----------------+-----------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|
|  2|      John|    Smith|1500.0|  England|    0| +548 35199456987|35199456987|
|  3|      Nick|   Warren| 750.0|Australia|    0| +547 35199456666|35199456666|
|  4|      Bill|  Clinton|2000.0|      USA|    0|+549 351994784666|35199478466|
+---+----------+---------+------+---------+-----+-----------------+-----------+



In [None]:
dummy_na_df.withColumn('payment', col('salary') + col('salary') * coalesce(col('bonus').cast('int'), lit(0))/100).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|payment|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554| 1150.0|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987| 1500.0|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|  750.0|
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466| 2000.0|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+



##### Using CASE WHEN

In [None]:
dummy_na_df.withColumn('bonus_1', coalesce(col('bonus').cast('int'), lit(0))).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|bonus_1|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|     15|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987|      0|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|      0|
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466|      0|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+



In [None]:
dummy_na_df.withColumn('bonus_1', expr("""
                                        CASE WHEN bonus IS NULL or bonus = '' THEN 0
                                             ELSE bonus END
                                        """)).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|bonus_1|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|     15|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987|      0|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|      0|
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466|      0|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+



In [None]:
dummy_na_df.withColumn('bonus_1', when((dummy_na_df.bonus.isNull()) | (dummy_na_df.bonus == lit('')), 0).otherwise(col('bonus'))).show()

+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
| id|first_name|last_name|salary|  country|bonus|     phone_number|        SSN|bonus_1|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+
|  1|     Steve|  Ramirez|1000.0|      USA|   15| +549 35199999554|35199999554|     15|
|  2|      John|    Smith|1500.0|  England| null| +548 35199456987|35199456987|      0|
|  3|      Nick|   Warren| 750.0|Australia|     | +547 35199456666|35199456666|      0|
|  4|      Bill|  Clinton|2000.0|      USA| null|+549 351994784666|35199478466|      0|
+---+----------+---------+------+---------+-----+-----------------+-----------+-------+



In [None]:
persons = [(1, 1),
           (2, 13),
           (3, 18),
           (4, 60),
           (5, 120),
           (6, 0),
           (7, 12),
           (8, 160)]

df_persons = spark.createDataFrame(persons, schema='id INT, age_months INT')

In [None]:
df_persons.withColumn('category', expr("CASE WHEN age_months BETWEEN 0 AND 2 THEN 'New Born'\
                                             WHEN age_months > 2 AND age_months <= 12 THEN 'Infant'\
                                             WHEN age_months > 12 and age_months <= 48 THEN 'Toddler'\
                                             WHEN age_months > 48 and age_months <= 144 THEN 'Kid'\
                                             ELSE 'Teenager or Adult' END")).show()

+---+----------+-----------------+
| id|age_months|         category|
+---+----------+-----------------+
|  1|         1|         New Born|
|  2|        13|          Toddler|
|  3|        18|          Toddler|
|  4|        60|              Kid|
|  5|       120|              Kid|
|  6|         0|         New Born|
|  7|        12|           Infant|
|  8|       160|Teenager or Adult|
+---+----------+-----------------+



In [None]:
df_persons.withColumn('category', when(df_persons.age_months.between(0, 2), 'New Born'). \
                                  when((df_persons.age_months > 2) & (df_persons.age_months <= 12), 'Infant'). \
                                  when((df_persons.age_months > 12) & (df_persons.age_months <= 48), 'Toddler'). \
                                  when((df_persons.age_months > 48) & (df_persons.age_months <= 144), 'Kid'). \
                                  otherwise('Teenager or Adult')).show()

+---+----------+-----------------+
| id|age_months|         category|
+---+----------+-----------------+
|  1|         1|         New Born|
|  2|        13|          Toddler|
|  3|        18|          Toddler|
|  4|        60|              Kid|
|  5|       120|              Kid|
|  6|         0|         New Born|
|  7|        12|           Infant|
|  8|       160|Teenager or Adult|
+---+----------+-----------------+



#### Filtering data form Spark dataframes

##### Filter and Where functions

In [None]:
users_df_5.show(truncate=False)

+---+----------+---------+----------------------+-------------------------------------------------+-----------+-----------+-------------------+-------------------+
|id |first_name|last_name|email                 |phone_numbers                                    |is_customer|amount_paid|customer_from      |last_updated_ts    |
+---+----------+---------+----------------------+-------------------------------------------------+-----------+-----------+-------------------+-------------------+
|1  |Matias    |Dibo     |cvmdibo@email.com     |[mobile -> +5493512500000]                       |true       |1000.0     |2021-01-25 00:00:00|2021-12-03 00:00:00|
|2  |Juan      |Estevanez|cvjestevanez@email.com|[mobile -> +5493512999999, home -> +548692214553]|true       |152.5      |2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+----------------------+-------------------------------------------------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_5.filter(users_df_5.id == 1).show()

+---+----------+---------+-----------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_5.where('id = 2').show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_5.where("first_name LIKE 'J%'").show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_5.createOrReplaceTempView('users')

In [None]:
spark.sql("""
          SELECT *
          FROM users
          WHERE first_name LIKE 'J%'
          """).show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-------------------+-------------------+



##### Filtering using conditions

In [None]:
user_8 = [
        {'id':1,
         'first_name':'Matias',
         'last_name':'Dibo',
         'email':'cvmdibo@email.com',
         'phone_numbers':{'mobile':'+5493512500000'},
         'city': '',
         'is_customer':True,
         'amount_paid':1000.0,
         'customer_from':datetime.datetime(2021,1,25),
         'last_updated_ts':datetime.datetime(2021,12,3)},
        
        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'phone_numbers':{'mobile':'+5493512999999', 'home':'+548692214553'},
         'city': None,
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)},
          
        {'id':3,
         'first_name':'Alejandro',
         'last_name':'Perez',
         'email':'cvaperez@email.com',
         'phone_numbers':{'mobile':'+5493512999555', 'home':'+548692214773'},
         'city': 'Los Angeles',
         'is_customer':False,
         'amount_paid':None,
         'customer_from':None,
         'last_updated_ts':datetime.datetime(2022,1,31)}
]

users_df_8 = spark.createDataFrame([Row(**i) for i in user_8])

In [None]:
users_df_8.where(users_df_8['is_customer'] == False).show()

+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
| id|first_name|last_name|             email|       phone_numbers|       city|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
|  3| Alejandro|    Perez|cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|         null|2022-01-31 00:00:00|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+



In [None]:
users_df_8.where('is_customer = True').show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.createOrReplaceTempView('users')

In [None]:
spark.sql("""
          SELECT *
          FROM users
          WHERE is_customer = True and (first_name LIKE 'J%' OR first_name LIKE 'A%')
          """).show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where((col('customer_from') >= '2020-06-05') & (col('customer_from') <= '2021-01-01')).show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(col('customer_from') != '2020-06-12').show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(col('amount_paid') == 1000).show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(col('amount_paid') != '1000').show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(isnull('amount_paid') | isnan('amount_paid')).show()

+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
| id|first_name|last_name|             email|       phone_numbers|       city|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
|  3| Alejandro|    Perez|cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|         null|2022-01-31 00:00:00|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+



In [None]:
users_df_8.where("amount_paid IS NULL AND first_name ='Alejandro'").show()

+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
| id|first_name|last_name|             email|       phone_numbers|       city|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
|  3| Alejandro|    Perez|cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|         null|2022-01-31 00:00:00|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+



In [None]:
users_df_8.show(truncate=False)

+---+----------+---------+----------------------+-------------------------------------------------+-----------+-----------+-----------+-------------------+-------------------+
|id |first_name|last_name|email                 |phone_numbers                                    |city       |is_customer|amount_paid|customer_from      |last_updated_ts    |
+---+----------+---------+----------------------+-------------------------------------------------+-----------+-----------+-----------+-------------------+-------------------+
|1  |Matias    |Dibo     |cvmdibo@email.com     |[mobile -> +5493512500000]                       |           |true       |1000.0     |2021-01-25 00:00:00|2021-12-03 00:00:00|
|2  |Juan      |Estevanez|cvjestevanez@email.com|[mobile -> +5493512999999, home -> +548692214553]|null       |true       |152.5      |2020-06-12 00:00:00|2022-01-04 00:00:00|
|3  |Alejandro |Perez    |cvaperez@email.com    |[mobile -> +5493512999555, home -> +548692214773]|Los Angeles|false    

In [None]:
users_df_8.where("last_updated_ts BETWEEN '2022-01' AND '2022-03'").show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(col('last_updated_ts').between('2022-01','2022-03')).show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(~isnull('amount_paid')).show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(col('amount_paid').isNotNull()).show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where("amount_paid IS NOT NULL").show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where((col('city') == '') | (col('city').isNull())).show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where("(city = '') OR (city IS NULL)").show()

+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
+---+----------+---------+--------------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where(col('city').isin('Los Angeles')).show()

+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
| id|first_name|last_name|             email|       phone_numbers|       city|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+
|  3| Alejandro|    Perez|cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|         null|2022-01-31 00:00:00|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------+-------------------+



In [None]:
users_df_8.where("city IN ('Los Angeles','')").show()

+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|             email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo| cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  3| Alejandro|    Perez|cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where((col('amount_paid') > 150) & ((isnan(col('city'))) | (~isnull(col('city'))))).show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where((col('amount_paid') > 150) & ((isnan(col('city'))) | (isnull(col('city')) == False))).show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_8.where("amount_paid > 150 AND city IS NOT NULL").show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



#### Dropping columns from Spark dataframes

##### Droping a single column from a dataframe

In [None]:
users_df_8.drop('email').columns

['id',
 'first_name',
 'last_name',
 'phone_numbers',
 'city',
 'is_customer',
 'amount_paid',
 'customer_from',
 'last_updated_ts']

In [None]:
users_df_8.drop(users_df_8.email).columns

['id',
 'first_name',
 'last_name',
 'phone_numbers',
 'city',
 'is_customer',
 'amount_paid',
 'customer_from',
 'last_updated_ts']

##### Droping multiple columns from a dataframe

In [None]:
users_df_8.drop('email','city').columns

['id',
 'first_name',
 'last_name',
 'phone_numbers',
 'is_customer',
 'amount_paid',
 'customer_from',
 'last_updated_ts']

##### Droping list of columns from a dataframe

In [None]:
drop_list = ['email','city']

users_df_8.drop(*drop_list).columns

['id',
 'first_name',
 'last_name',
 'phone_numbers',
 'is_customer',
 'amount_paid',
 'customer_from',
 'last_updated_ts']

##### Droping duplicate records from a dataframe

In [None]:
 user_duplicated = [
        {'id':1,
         'first_name':'Matias',
         'last_name':'Dibo',
         'email':'cvmdibo@email.com',
         'phone_numbers':{'mobile':'+5493512500000'},
         'city': '',
         'is_customer':True,
         'amount_paid':1000.0,
         'customer_from':datetime.datetime(2021,1,25),
         'last_updated_ts':datetime.datetime(2021,12,3)},
        
        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'phone_numbers':{'mobile':'+5493512999999', 'home':'+548692214553'},
         'city': None,
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)},

        {'id':2,
         'first_name':'Juan',
         'last_name':'Estevanez',
         'email':'cvjestevanez@email.com',
         'phone_numbers':{'mobile':'+5493512999999', 'home':'+548692214553'},
         'city': None,
         'is_customer':True,
         'amount_paid':152.5,
         'customer_from':datetime.datetime(2020,6,12),
         'last_updated_ts':datetime.datetime(2022,1,4)},
          
        {'id':3,
         'first_name':'Alejandro',
         'last_name':'Perez',
         'email':'cvaperez@email.com',
         'phone_numbers':{'mobile':'+5493512999555', 'home':'+548692214773'},
         'city': 'Los Angeles',
         'is_customer':False,
         'amount_paid':None,
         'customer_from':None,
         'last_updated_ts':datetime.datetime(2022,1,31)},

        {'id':None,
         'first_name':None,
         'last_name':None,
         'email':None,
         'phone_numbers':None,
         'city': None,
         'is_customer':None,
         'amount_paid':None,
         'customer_from':None,
         'last_updated_ts':None},

        {'id':4,
         'first_name':'Santiago',
         'last_name':'Rosas',
         'email':'cvsrosas@email.com',
         'phone_numbers':{'mobile':'+5493512999444', 'home':'+548692214443'},
         'city': 'New York',
         'is_customer':False,
         'amount_paid':500.0,
         'customer_from':None,
         'last_updated_ts':datetime.datetime(2022,1,31)}
]

users_df_duplicated = spark.createDataFrame([Row(**i) for i in user_duplicated])

In [None]:
users_df_duplicated.show()

+----+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+----+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|   1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|   2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|   2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|   3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null

In [None]:
users_df_duplicated.drop_duplicates(['id']).orderBy('id').show()

+----+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+----+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|null|      null|     null|                null|                null|       null|       null|       null|               null|               null|
|   1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|   2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|   3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null

In [None]:
users_df_duplicated.dropDuplicates(['is_customer','customer_from']).orderBy('id').show()

+----+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+----+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|null|      null|     null|                null|                null|       null|       null|       null|               null|               null|
|   1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|   2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|   3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null

##### Droping null records from a dataframe

In [None]:
users_df_duplicated.na.drop(how='any').show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_duplicated.dropna(how='any').show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_duplicated.na.drop(how='all').show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-0

In [None]:
users_df_duplicated.dropna(how='any').show()

+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|            email|       phone_numbers|city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|cvmdibo@email.com|[mobile -> +54935...|    |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+-----------------+--------------------+----+-----------+-----------+-------------------+-------------------+



In [None]:
users_df_duplicated.na.drop(how='all', subset=['city']).show()

+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|             email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo| cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  3| Alejandro|    Perez|cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
|  4|  Santiago|    Rosas|cvsrosas@email.com|[mobile -> +54935...|   New York|      false|      500.0|               null|2022-01-31 00:00:00|
+---+----------+---------+------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+

#### Sorting data in Spark dataframes

##### Sort Spark dataframes in ascending or descending order by a given column

In [None]:
users_df_8.orderBy('first_name', ascending=False).show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+------

In [None]:
users_df_8.orderBy(users_df_8.first_name.desc()).show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+------

In [None]:
users_df_8.sort('customer_from').show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+------

In [None]:
users_df_8.sort('last_updated_ts', ascending=False).show()

+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
| id|first_name|last_name|               email|       phone_numbers|       city|is_customer|amount_paid|      customer_from|    last_updated_ts|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+-------------------+
|  3| Alejandro|    Perez|  cvaperez@email.com|[mobile -> +54935...|Los Angeles|      false|       null|               null|2022-01-31 00:00:00|
|  2|      Juan|Estevanez|cvjestevanez@emai...|[mobile -> +54935...|       null|       true|      152.5|2020-06-12 00:00:00|2022-01-04 00:00:00|
|  1|    Matias|     Dibo|   cvmdibo@email.com|[mobile -> +54935...|           |       true|     1000.0|2021-01-25 00:00:00|2021-12-03 00:00:00|
+---+----------+---------+--------------------+--------------------+-----------+-----------+-----------+-------------------+------

##### Composite sorting a dataframe

In [None]:
orders.orderBy(['order_status','id'], ascending=[1, 1]).show(10)

+---+----------+----------------+------------+
| id|order_date|oder_customer_id|order_status|
+---+----------+----------------+------------+
| 50|2013-07-25|            5225|    CANCELED|
|112|2013-07-26|            5375|    CANCELED|
|527|2013-07-28|            5426|    CANCELED|
|552|2013-07-28|            1445|    CANCELED|
|564|2013-07-28|            2216|    CANCELED|
|607|2013-07-28|            6376|    CANCELED|
|649|2013-07-28|            7261|    CANCELED|
|667|2013-07-28|            4726|    CANCELED|
|716|2013-07-29|            2581|    CANCELED|
|717|2013-07-29|            8208|    CANCELED|
+---+----------+----------------+------------+
only showing top 10 rows



In [None]:
orders.orderBy(orders.order_status.desc(), orders.id.asc()).show(10)

+---+----------+----------------+---------------+
| id|order_date|oder_customer_id|   order_status|
+---+----------+----------------+---------------+
| 69|2013-07-25|            2821|SUSPECTED_FRAUD|
|117|2013-07-26|              58|SUSPECTED_FRAUD|
|246|2013-07-26|            9616|SUSPECTED_FRAUD|
|320|2013-07-26|           10698|SUSPECTED_FRAUD|
|329|2013-07-26|            1944|SUSPECTED_FRAUD|
|411|2013-07-27|           11164|SUSPECTED_FRAUD|
|423|2013-07-27|            9632|SUSPECTED_FRAUD|
|453|2013-07-27|             381|SUSPECTED_FRAUD|
|548|2013-07-28|            6889|SUSPECTED_FRAUD|
|580|2013-07-28|            8677|SUSPECTED_FRAUD|
+---+----------+----------------+---------------+
only showing top 10 rows



#### Aggregations on Spark dataframes

##### Loading dataframe for aggregations

In [None]:
orders_items_schema = '''
                      order_item_id INT,
                      order_item_order_id INT,
                      order_item_product_id INT,
                      order_item_quantity INT,
                      order_item_product_subtotal DOUBLE,
                      order_item_product_price DOUBLE
                      '''

order_items_df = spark.read.csv('/content/drive/MyDrive/retail_db-master/csv_files/order_items', schema=orders_items_schema)

In [None]:
order_items_df.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_product_subtotal: double (nullable = true)
 |-- order_item_product_price: double (nullable = true)



In [None]:
lst_order_cols = ['order_item_id','order_item_order_id','order_item_product_id',
                  'order_item_product_price','order_item_quantity','order_item_product_subtotal']

order_items_df = order_items_df.select(*lst_order_cols)

##### Common aggregate functions

In [None]:
# Number of rows within the dataframe
orders.select(count('*')).show()

+--------+
|count(1)|
+--------+
|   68883|
+--------+



In [None]:
orders.groupBy('order_status').count().show()

+---------------+-----+
|   order_status|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



In [None]:
orders.groupBy('order_status').agg(count('*')).show()

+---------------+--------+
|   order_status|count(1)|
+---------------+--------+
|PENDING_PAYMENT|   15030|
|       COMPLETE|   22899|
|        ON_HOLD|    3798|
| PAYMENT_REVIEW|     729|
|     PROCESSING|    8275|
|         CLOSED|    7556|
|SUSPECTED_FRAUD|    1558|
|        PENDING|    7610|
|       CANCELED|    1428|
+---------------+--------+



In [None]:
orders.groupBy('order_status').min().show()

+---------------+-------+---------------------+
|   order_status|min(id)|min(oder_customer_id)|
+---------------+-------+---------------------+
|PENDING_PAYMENT|      2|                    2|
|       COMPLETE|      3|                    1|
|        ON_HOLD|     46|                    2|
| PAYMENT_REVIEW|     11|                   12|
|     PROCESSING|      8|                    3|
|         CLOSED|      1|                    4|
|SUSPECTED_FRAUD|     69|                   16|
|        PENDING|     21|                    3|
|       CANCELED|     50|                   18|
+---------------+-------+---------------------+



In [None]:
orders.groupBy('order_status').min('id').show()

+---------------+-------+
|   order_status|min(id)|
+---------------+-------+
|PENDING_PAYMENT|      2|
|       COMPLETE|      3|
|        ON_HOLD|     46|
| PAYMENT_REVIEW|     11|
|     PROCESSING|      8|
|         CLOSED|      1|
|SUSPECTED_FRAUD|     69|
|        PENDING|     21|
|       CANCELED|     50|
+---------------+-------+



In [None]:
orders.groupBy('order_status').max('oder_customer_id').show()

+---------------+---------------------+
|   order_status|max(oder_customer_id)|
+---------------+---------------------+
|PENDING_PAYMENT|                12434|
|       COMPLETE|                12434|
|        ON_HOLD|                12434|
| PAYMENT_REVIEW|                12433|
|     PROCESSING|                12431|
|         CLOSED|                12434|
|SUSPECTED_FRAUD|                12429|
|        PENDING|                12435|
|       CANCELED|                12435|
+---------------+---------------------+



In [None]:
order_items_df.groupBy('order_item_order_id').sum().show(10)

+-------------------+------------------+------------------------+--------------------------+-----------------------------+------------------------+--------------------------------+
|order_item_order_id|sum(order_item_id)|sum(order_item_order_id)|sum(order_item_product_id)|sum(order_item_product_price)|sum(order_item_quantity)|sum(order_item_product_subtotal)|
+-------------------+------------------+------------------------+--------------------------+-----------------------------+------------------------+--------------------------------+
|                148|              1047|                     444|                      1407|                       229.99|                       8|                          479.99|
|                463|              4522|                    1852|                      1685|           249.97000000000003|                      13|               829.9200000000001|
|                471|              2307|                     942|                      1030|   

In [None]:
order_items_df.select('order_item_order_id','order_item_quantity','order_item_product_subtotal')\
              .where(col('order_item_order_id') == 2)\
              .show()

+-------------------+-------------------+---------------------------+
|order_item_order_id|order_item_quantity|order_item_product_subtotal|
+-------------------+-------------------+---------------------------+
|                  2|                  1|                     199.99|
|                  2|                  5|                      250.0|
|                  2|                  1|                     129.99|
+-------------------+-------------------+---------------------------+



In [None]:
# Get revenue using order_item_subtotal for a given order_item_order_id

order_items_df.select('order_item_order_id','order_item_product_subtotal')\
              .where(col('order_item_order_id') == 2)\
              .agg(sum('order_item_product_subtotal'))\
              .show()

+--------------------------------+
|sum(order_item_product_subtotal)|
+--------------------------------+
|                          579.98|
+--------------------------------+



In [None]:
order_items_df.select('order_item_order_id','order_item_product_subtotal','order_item_quantity')\
              .where(col('order_item_order_id') == 2)\
              .select(count('order_item_quantity').alias('num_orders'),
                      sum('order_item_quantity').alias('quantity_orders'),
                      sum('order_item_product_subtotal').alias('subtotal'),
                      round(sum('order_item_product_subtotal')/count('order_item_quantity'),2).alias('avg_sale_per_order'))\
              .show()

+----------+---------------+--------+------------------+
|num_orders|quantity_orders|subtotal|avg_sale_per_order|
+----------+---------------+--------+------------------+
|         3|              7|  579.98|            193.33|
+----------+---------------+--------+------------------+



In [None]:
# Get revenue using order_item_subtotal for each order_item_order_id

order_items_df.groupBy('order_item_order_id')\
              .agg(round(sum('order_item_product_subtotal'),2)
              .alias('sum_subtotal'))\
              .orderBy('sum_subtotal', ascending=False)\
              .show(10)

+-------------------+------------+
|order_item_order_id|sum_subtotal|
+-------------------+------------+
|              68703|     3449.91|
|              68724|     2859.89|
|              68858|     2839.91|
|              68809|     2779.86|
|              68766|      2699.9|
|              68806|     2629.92|
|              68821|     2629.92|
|              68778|      2629.9|
|              68848|     2399.96|
|              68875|     2399.95|
+-------------------+------------+
only showing top 10 rows



##### Getting count of Spark dataframe

In [None]:
# Num of rows in the dataframes
order_items_df.count()

172198

In [None]:
# Num of rows in the dataframes (a dataframe is created with this method)
order_items_df.select(count('*')).show()

+--------+
|count(1)|
+--------+
|  172198|
+--------+



##### Grouped aggregations using direct functions on Spark dataframes

In [None]:
order_items_grouped = order_items_df.groupBy('order_item_order_id')

In [None]:
order_items_grouped. \
      count(). \
      withColumnRenamed('count','order_count'). \
      show(5)

+-------------------+-----------+
|order_item_order_id|order_count|
+-------------------+-----------+
|                148|          3|
|                463|          4|
|                471|          2|
|                496|          5|
|               1088|          2|
+-------------------+-----------+
only showing top 5 rows



In [None]:
# Sum of all numeric fields
order_items_grouped. \
      sum(). \
      show(5)

+-------------------+------------------+------------------------+--------------------------+-----------------------------+------------------------+--------------------------------+
|order_item_order_id|sum(order_item_id)|sum(order_item_order_id)|sum(order_item_product_id)|sum(order_item_product_price)|sum(order_item_quantity)|sum(order_item_product_subtotal)|
+-------------------+------------------+------------------------+--------------------------+-----------------------------+------------------------+--------------------------------+
|                148|              1047|                     444|                      1407|                       229.99|                       8|                          479.99|
|                463|              4522|                    1852|                      1685|           249.97000000000003|                      13|               829.9200000000001|
|                471|              2307|                     942|                      1030|   

In [None]:
order_items_df.select('order_item_order_id','order_item_quantity','order_item_product_subtotal'). \
              groupBy('order_item_order_id').sum().show(5)

+-------------------+------------------------+------------------------+--------------------------------+
|order_item_order_id|sum(order_item_order_id)|sum(order_item_quantity)|sum(order_item_product_subtotal)|
+-------------------+------------------------+------------------------+--------------------------------+
|                148|                     444|                       8|                          479.99|
|                463|                    1852|                      13|               829.9200000000001|
|                471|                     942|                       2|              169.98000000000002|
|                496|                    2480|                       7|              441.95000000000005|
|               1088|                    2176|                       3|              249.97000000000003|
+-------------------+------------------------+------------------------+--------------------------------+
only showing top 5 rows



In [None]:
# We can use .toDF() to rename columns that are the result of aggregations
order_items_grouped.sum('order_item_quantity','order_item_product_subtotal'). \
                    toDF('order_item_order_id','order_quantity','order_revenue'). \
                    withColumn('order_revenue', round(col('order_revenue'),2)). \
                    show(5)

+-------------------+--------------+-------------+
|order_item_order_id|order_quantity|order_revenue|
+-------------------+--------------+-------------+
|                148|             8|       479.99|
|                463|            13|       829.92|
|                471|             2|       169.98|
|                496|             7|       441.95|
|               1088|             3|       249.97|
+-------------------+--------------+-------------+
only showing top 5 rows



In [None]:
# Get sum of numeric fields 
# order_date and order_status are ignored as they are not numeric fields

orders.groupBy('order_date'). \
    sum(). \
    show(5)

+----------+-------+---------------------+
|order_date|sum(id)|sum(oder_customer_id)|
+----------+-------+---------------------+
|2013-09-09|3087782|              1499494|
|2013-09-19|2830266|              1300695|
|2014-06-03|7200647|               793696|
|2013-09-12|2898570|              1125297|
|2014-01-24|6326762|              1010050|
+----------+-------+---------------------+
only showing top 5 rows



##### Grouped aggregations using agg on Spark dataframes

In [None]:
order_items_grouped.agg(sum('order_item_quantity').alias('order_quantity')
                      , round(sum('order_item_product_subtotal'),2).alias('order_revenue')). \
                   show(5)

+-------------------+--------------+-------------+
|order_item_order_id|order_quantity|order_revenue|
+-------------------+--------------+-------------+
|                148|             8|       479.99|
|                463|            13|       829.92|
|                471|             2|       169.98|
|                496|             7|       441.95|
|               1088|             3|       249.97|
+-------------------+--------------+-------------+
only showing top 5 rows



In [None]:
order_items_grouped.agg(sum('order_item_quantity'), round(sum('order_item_product_subtotal'),2)). \
                    toDF('order_item_order_id','order_quantity','order_revenue'). \
                    show(5)

+-------------------+--------------+-------------+
|order_item_order_id|order_quantity|order_revenue|
+-------------------+--------------+-------------+
|                148|             8|       479.99|
|                463|            13|       829.92|
|                471|             2|       169.98|
|                496|             7|       441.95|
|               1088|             3|       249.97|
+-------------------+--------------+-------------+
only showing top 5 rows



In [None]:
order_items_grouped.agg({'order_item_quantity':'min', 'order_item_product_subtotal':'sum'}). \
                    toDF('order_item_order_id','order_revenue','order_quantity'). \
                    withColumn('order_revenue',round(col('order_revenue'),2)). \
                    show(5)

+-------------------+-------------+--------------+
|order_item_order_id|order_revenue|order_quantity|
+-------------------+-------------+--------------+
|                148|       479.99|             1|
|                463|       829.92|             1|
|                471|       169.98|             1|
|                496|       441.95|             1|
|               1088|       249.97|             1|
+-------------------+-------------+--------------+
only showing top 5 rows



#### Joining Spark dataframes

##### Setup dataframe

In [None]:
courses = [
           {'course_id':1,
            'course_title':'Mastering Pyhton',
            'course_published_at':datetime.date(2021,1,14),
            'is_active':True,
            'last_updated_ts':datetime.datetime(2021, 2, 18, 16, 57, 25)},
           
           {'course_id':2,
            'course_title':'Data Engineering Essentials',
            'course_published_at':datetime.date(2021,2,10),
            'is_active':True,
            'last_updated_ts':datetime.datetime(2021, 3, 5, 12, 7, 33)},

           {'course_id':3,
            'course_title':'Mastering Pyspark',
            'course_published_at':datetime.date(2021,1,7),
            'is_active':True,
            'last_updated_ts':datetime.datetime(2021, 4, 6, 10, 5, 36)},

           {'course_id':4,
            'course_title':'AWS Essentials',
            'course_published_at':datetime.date(2021,3,19),
            'is_active':False,
            'last_updated_ts':datetime.datetime(2021, 4, 10, 2, 25, 36)},

           {'course_id':5,
            'course_title':'Docker 101',
            'course_published_at':datetime.date(2021,2,28),
            'is_active':True,
            'last_updated_ts':datetime.datetime(2021, 3, 21, 7, 18, 52)}          
]

courses_df = spark.createDataFrame([Row(**x) for x in courses])

In [None]:
users_join = [
              {'user_id':1,
               'user_first_name':'Sandra',
               'user_last_name':'Karpov',
               'user_mail':'skarpov@sraj.com'},
              
              {'user_id':2,
               'user_first_name':'Jacob',
               'user_last_name':'Clinton',
               'user_mail':'jclinton@sraj.com'},
              
              {'user_id':3,
               'user_first_name':'Kyle',
               'user_last_name':'Amstrong',
               'user_mail':'kamstrong@sraj.com'},
              
              {'user_id':4,
               'user_first_name':'Joanna',
               'user_last_name':'Spenneck',
               'user_mail':'jspenneck@sraj.com'},
              
              {'user_id':5,
               'user_first_name':'Kyle',
               'user_last_name':'Amstrong',
               'user_mail':'kamstrong@sraj.com'},
              
              {'user_id':6,
               'user_first_name':'Augy',
               'user_last_name':'Christon',
               'user_mail':'achriston@sraj.com'},
              
              {'user_id':7,
               'user_first_name':'Trudey',
               'user_last_name':'Choupin',
               'user_mail':'tchoupin@sraj.com'},
              
              {'user_id':8,
               'user_first_name':'Nadline',
               'user_last_name':'Grindsel',
               'user_mail':'ngrindsel@sraj.com'},
              
              {'user_id':9,
               'user_first_name':'Vassily',
               'user_last_name':'Tamas',
               'user_mail':'vtamas@sraj.com'},
              
              {'user_id':10,
               'user_first_name':'Wells',
               'user_last_name':'Simpkins',
               'user_mail':'wsimpkins@sraj.com'}
]

users_df = spark.createDataFrame([Row(**i) for i in users_join])

In [None]:
course_enrollments = [
                      {'course_enrolment_id':1,
                       'user_id':10,
                       'course_id':2,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':2,
                       'user_id':5,
                       'course_id':2,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':3,
                       'user_id':7,
                       'course_id':5,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':4,
                       'user_id':9,
                       'course_id':2,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':5,
                       'user_id':8,
                       'course_id':2,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':6,
                       'user_id':5,
                       'course_id':5,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':7,
                       'user_id':4,
                       'course_id':5,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':8,
                       'user_id':7,
                       'course_id':3,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':9,
                       'user_id':8,
                       'course_id':5,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':10,
                       'user_id':3,
                       'course_id':3,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':11,
                       'user_id':7,
                       'course_id':5,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':12,
                       'user_id':3,
                       'course_id':2,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':13,
                       'user_id':10,
                       'course_id':2,
                       'price_paid':9.99},
                      
                      {'course_enrolment_id':14,
                       'user_id':4,
                       'course_id':3,
                       'price_paid':10.99},
                      
                      {'course_enrolment_id':15,
                       'user_id':8,
                       'course_id':2,
                       'price_paid':9.99}
]

course_enrollments_df = spark.createDataFrame([Row(**i) for i in course_enrollments])

##### Performing Inner Join on Spark dataframes

In [None]:
# we can pass the condition specifying the columns of each dataframe
# either both dataframes has the same or different columns name
users_df.join(course_enrollments_df, users_df.user_id == course_enrollments_df.user_id).show(5)

+-------+---------------+--------------+------------------+-------------------+-------+---------+----------+
|user_id|user_first_name|user_last_name|         user_mail|course_enrolment_id|user_id|course_id|price_paid|
+-------+---------------+--------------+------------------+-------------------+-------+---------+----------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  3|      7|        5|     10.99|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  8|      7|        3|     10.99|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                 11|      7|        5|      9.99|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|                  4|      9|        2|      9.99|
|      5|           Kyle|      Amstrong|kamstrong@sraj.com|                  2|      5|        2|      9.99|
+-------+---------------+--------------+------------------+-------------------+-------+---------+----------+
only showing top 5 

In [None]:
# as both dataframes have user_id using same name, we can pass column name as string as well
users_df.join(course_enrollments_df, 'user_id').show(5)

+-------+---------------+--------------+------------------+-------------------+---------+----------+
|user_id|user_first_name|user_last_name|         user_mail|course_enrolment_id|course_id|price_paid|
+-------+---------------+--------------+------------------+-------------------+---------+----------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  3|        5|     10.99|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  8|        3|     10.99|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                 11|        5|      9.99|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|                  4|        2|      9.99|
|      5|           Kyle|      Amstrong|kamstrong@sraj.com|                  2|        2|      9.99|
+-------+---------------+--------------+------------------+-------------------+---------+----------+
only showing top 5 rows



In [None]:
# Get all columns from users_df as well as course_enrollment_id and course_id from course_enrollments_df
users_df.join(course_enrollments_df, 'user_id'). \
        select(users_df['*'], course_enrollments_df['course_enrolment_id'], course_enrollments_df['course_id']). \
        show(5)

+-------+---------------+--------------+------------------+-------------------+---------+
|user_id|user_first_name|user_last_name|         user_mail|course_enrolment_id|course_id|
+-------+---------------+--------------+------------------+-------------------+---------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  3|        5|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  8|        3|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                 11|        5|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|                  4|        2|
|      5|           Kyle|      Amstrong|kamstrong@sraj.com|                  2|        2|
+-------+---------------+--------------+------------------+-------------------+---------+
only showing top 5 rows



In [None]:
# using alias
users_df.alias('u').join(course_enrollments_df.alias('ce'), 'user_id'). \
         select('u.*', 'ce.course_id','ce.course_enrolment_id').show(5)

+-------+---------------+--------------+------------------+---------+-------------------+
|user_id|user_first_name|user_last_name|         user_mail|course_id|course_enrolment_id|
+-------+---------------+--------------+------------------+---------+-------------------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        5|                  3|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        3|                  8|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        5|                 11|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|        2|                  4|
|      5|           Kyle|      Amstrong|kamstrong@sraj.com|        2|                  2|
+-------+---------------+--------------+------------------+---------+-------------------+
only showing top 5 rows



In [None]:
# Get number of courses enrolled by each user
users_df.alias('u').join(course_enrollments_df.alias('ce'), 'user_id'). \
         groupBy('user_id'). \
         count(). \
         orderBy('user_id'). \
         show(5)

+-------+-----+
|user_id|count|
+-------+-----+
|      3|    2|
|      4|    2|
|      5|    2|
|      7|    3|
|      8|    3|
+-------+-----+
only showing top 5 rows



##### Performing outer join using 'left' between spark dataframes

In [None]:
# Get all the users details along with the course enrollment details (if the user has any course enrollment)
users_df.join(course_enrollments_df, 'user_id', 'left').show(5) # we can also use 'left_outer' or 'leftouter'

+-------+---------------+--------------+------------------+-------------------+---------+----------+
|user_id|user_first_name|user_last_name|         user_mail|course_enrolment_id|course_id|price_paid|
+-------+---------------+--------------+------------------+-------------------+---------+----------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  3|        5|     10.99|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                  8|        3|     10.99|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|                 11|        5|      9.99|
|      6|           Augy|      Christon|achriston@sraj.com|               null|     null|      null|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|                  4|        2|      9.99|
+-------+---------------+--------------+------------------+-------------------+---------+----------+
only showing top 5 rows



In [None]:
# using alias
users_df.alias('u').join(course_enrollments_df.alias('ce'), 'user_id', 'left'). \
         select('u.*', 'ce.course_id','ce.course_enrolment_id').show(5)

+-------+---------------+--------------+------------------+---------+-------------------+
|user_id|user_first_name|user_last_name|         user_mail|course_id|course_enrolment_id|
+-------+---------------+--------------+------------------+---------+-------------------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        5|                  3|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        3|                  8|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        5|                 11|
|      6|           Augy|      Christon|achriston@sraj.com|     null|               null|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|        2|                  4|
+-------+---------------+--------------+------------------+---------+-------------------+
only showing top 5 rows



In [None]:
# using alias
users_df.alias('u').join(course_enrollments_df.alias('ce'), 'user_id', 'left'). \
         select('u.*', 'ce.course_id','ce.course_enrolment_id'). \
         where("ce.course_id IS NOT NULL"). \
         show(5)

 #where(~isnull('ce.course_id'))        

+-------+---------------+--------------+------------------+---------+-------------------+
|user_id|user_first_name|user_last_name|         user_mail|course_id|course_enrolment_id|
+-------+---------------+--------------+------------------+---------+-------------------+
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        5|                  3|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        3|                  8|
|      7|         Trudey|       Choupin| tchoupin@sraj.com|        5|                 11|
|      9|        Vassily|         Tamas|   vtamas@sraj.com|        2|                  4|
|      5|           Kyle|      Amstrong|kamstrong@sraj.com|        2|                  2|
+-------+---------------+--------------+------------------+---------+-------------------+
only showing top 5 rows



In [None]:
# Return the number of courses enrolled by each user
users_df.alias('u').join(course_enrollments_df.alias('ce'), 'user_id', 'left'). \
         groupBy('user_id'). \
         agg(sum(when(isnull('course_enrolment_id'),0).otherwise(1)).alias('course_count')). \
         orderBy('user_id'). \
         show(5)

+-------+------------+
|user_id|course_count|
+-------+------------+
|      1|           0|
|      2|           0|
|      3|           2|
|      4|           2|
|      5|           2|
+-------+------------+
only showing top 5 rows



In [None]:
users_df.alias('u').join(course_enrollments_df.alias('ce'), 'user_id', 'left'). \
         groupBy('user_id'). \
         agg(sum(expr('''
                      CASE WHEN course_enrolment_id IS NULL THEN 0
                      ELSE 1
                      END
                      ''')).alias('enrollment_count')). \
         orderBy('user_id'). \
         show(5)

+-------+----------------+
|user_id|enrollment_count|
+-------+----------------+
|      1|               0|
|      2|               0|
|      3|               2|
|      4|               2|
|      5|               2|
+-------+----------------+
only showing top 5 rows



##### Performing outer join using 'right' between spark dataframes

In [None]:
# using alias
course_enrollments_df.alias('ce').join(users_df.alias('u'), 'user_id', 'right'). \
         show(5)

+-------+-------------------+---------+----------+---------------+--------------+------------------+
|user_id|course_enrolment_id|course_id|price_paid|user_first_name|user_last_name|         user_mail|
+-------+-------------------+---------+----------+---------------+--------------+------------------+
|      7|                  3|        5|     10.99|         Trudey|       Choupin| tchoupin@sraj.com|
|      7|                  8|        3|     10.99|         Trudey|       Choupin| tchoupin@sraj.com|
|      7|                 11|        5|      9.99|         Trudey|       Choupin| tchoupin@sraj.com|
|      6|               null|     null|      null|           Augy|      Christon|achriston@sraj.com|
|      9|                  4|        2|      9.99|        Vassily|         Tamas|   vtamas@sraj.com|
+-------+-------------------+---------+----------+---------------+--------------+------------------+
only showing top 5 rows



In [None]:
# All the operations and aggregations that are performed with 'right' join, works as same as 'left' join

##### Performing full outer join between Spark dataframes

In [None]:
users1 = [
          {'email':'alovett0@nsw.gov.ar',
           'first_name':'Aundrea',
           'last_name':'Lovett',
           'gender':'Male'},
          
          {'email':'bjowling1@nsw.gov.ar',
           'first_name':'Bettine',
           'last_name':'Jowling',
           'gender':'Female'},
          
          {'email':'rablitt2@nsw.gov.ar',
           'first_name':'Reggie',
           'last_name':'Ablitt',
           'gender':'Male'},
          
          {'email':'tgavahan3@nsw.gov.ar',
           'first_name':'Ted',
           'last_name':'Gavahan',
           'gender':'Female'},
          
          {'email':'ccastelan4@nsw.gov.ar',
           'first_name':'Chantal',
           'last_name':'Castelan',
           'gender':'Female'}
]

users2 = [
          {'email':'lbutland1@nsw.gov.ar',
           'first_name':'Lilas',
           'last_name':'Butland',
           'gender':'Female'},
          
          {'email':'fmancktelow@nsw.gov.ar',
           'first_name':'Farand',
           'last_name':'Mancktelow',
           'gender':'Female'},
          
          {'email':'rablitt2@nsw.gov.ar',
           'first_name':'Reggie',
           'last_name':'Ablitt',
           'gender':'Male'},
          
          {'email':'tgavahan3@nsw.gov.ar',
           'first_name':'Ted',
           'last_name':'Gavahan',
           'gender':'Female'},
          
          {'email':'kpitt@nsw.gov.ar',
           'first_name':'Katherine',
           'last_name':'Pitt',
           'gender':'Female'}
]

users1_df = spark.createDataFrame([Row(**i) for i in users1])
users2_df = spark.createDataFrame([Row(**i) for i in users2])

In [None]:
users1_df.join(users2_df, 'email', 'outer').show(truncate=False)

+----------------------+----------+---------+------+----------+----------+------+
|email                 |first_name|last_name|gender|first_name|last_name |gender|
+----------------------+----------+---------+------+----------+----------+------+
|kpitt@nsw.gov.ar      |null      |null     |null  |Katherine |Pitt      |Female|
|lbutland1@nsw.gov.ar  |null      |null     |null  |Lilas     |Butland   |Female|
|tgavahan3@nsw.gov.ar  |Ted       |Gavahan  |Female|Ted       |Gavahan   |Female|
|fmancktelow@nsw.gov.ar|null      |null     |null  |Farand    |Mancktelow|Female|
|ccastelan4@nsw.gov.ar |Chantal   |Castelan |Female|null      |null      |null  |
|rablitt2@nsw.gov.ar   |Reggie    |Ablitt   |Male  |Reggie    |Ablitt    |Male  |
|bjowling1@nsw.gov.ar  |Bettine   |Jowling  |Female|null      |null      |null  |
|alovett0@nsw.gov.ar   |Aundrea   |Lovett   |Male  |null      |null      |null  |
+----------------------+----------+---------+------+----------+----------+------+



##### Performing 'cross' join between Spark dataframes

In [None]:
users_df.join(courses_df).show(6)

+-------+---------------+--------------+------------------+---------+--------------------+-------------------+---------+-------------------+
|user_id|user_first_name|user_last_name|         user_mail|course_id|        course_title|course_published_at|is_active|    last_updated_ts|
+-------+---------------+--------------+------------------+---------+--------------------+-------------------+---------+-------------------+
|      1|         Sandra|        Karpov|  skarpov@sraj.com|        1|    Mastering Pyhton|         2021-01-14|     true|2021-02-18 16:57:25|
|      1|         Sandra|        Karpov|  skarpov@sraj.com|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|      2|          Jacob|       Clinton| jclinton@sraj.com|        1|    Mastering Pyhton|         2021-01-14|     true|2021-02-18 16:57:25|
|      2|          Jacob|       Clinton| jclinton@sraj.com|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|      3|    

In [None]:
users_df.crossJoin(courses_df).show(6)

+-------+---------------+--------------+------------------+---------+--------------------+-------------------+---------+-------------------+
|user_id|user_first_name|user_last_name|         user_mail|course_id|        course_title|course_published_at|is_active|    last_updated_ts|
+-------+---------------+--------------+------------------+---------+--------------------+-------------------+---------+-------------------+
|      1|         Sandra|        Karpov|  skarpov@sraj.com|        1|    Mastering Pyhton|         2021-01-14|     true|2021-02-18 16:57:25|
|      1|         Sandra|        Karpov|  skarpov@sraj.com|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|      2|          Jacob|       Clinton| jclinton@sraj.com|        1|    Mastering Pyhton|         2021-01-14|     true|2021-02-18 16:57:25|
|      2|          Jacob|       Clinton| jclinton@sraj.com|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|      3|    

In [None]:
users_df.join(courses_df).where(col('user_first_name') == 'Trudey').show()

+-------+---------------+--------------+-----------------+---------+--------------------+-------------------+---------+-------------------+
|user_id|user_first_name|user_last_name|        user_mail|course_id|        course_title|course_published_at|is_active|    last_updated_ts|
+-------+---------------+--------------+-----------------+---------+--------------------+-------------------+---------+-------------------+
|      7|         Trudey|       Choupin|tchoupin@sraj.com|        1|    Mastering Pyhton|         2021-01-14|     true|2021-02-18 16:57:25|
|      7|         Trudey|       Choupin|tchoupin@sraj.com|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|      7|         Trudey|       Choupin|tchoupin@sraj.com|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:36|
|      7|         Trudey|       Choupin|tchoupin@sraj.com|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|      7|         Tr

In [None]:
f'{users_df.join(courses_df).distinct().count()} = {users_df.count()} * {courses_df.count()}'

'50 = 10 * 5'

#### Reading data from Spark dataframes into files

##### Convert JSON data to parquet 

In [None]:
input_json_files = '/content/drive/MyDrive/retail_db-master/json_files/'
output_parquet_files = '/content/drive/MyDrive/retail_db-master/parquet_files/'

In [None]:
! ls /content/drive/MyDrive/retail_db-master/json_files

categories		 customers		order_items
create_db.sql		 departments		orders
create_db_tables_pg.sql  load_db_tables_pg.sql	products


In [None]:
for folder in os.listdir(input_json_files):
  if '.sql' not in folder:
    path = input_json_files + f'{folder}'
    df = spark.read.option("multiline","true").json(path)
    print(f'Converting data in {path} from json to parquet')
    df.coalesce(1).write.parquet(output_parquet_files + f'{folder}/', mode='overwrite')

Converting data in /content/drive/MyDrive/retail_db-master/json_files/orders from json to parquet
Converting data in /content/drive/MyDrive/retail_db-master/json_files/products from json to parquet
Converting data in /content/drive/MyDrive/retail_db-master/json_files/order_items from json to parquet
Converting data in /content/drive/MyDrive/retail_db-master/json_files/departments from json to parquet
Converting data in /content/drive/MyDrive/retail_db-master/json_files/customers from json to parquet
Converting data in /content/drive/MyDrive/retail_db-master/json_files/categories from json to parquet


In [None]:
df_parquet = spark.read.parquet('/content/drive/MyDrive/retail_db-master/parquet_files/orders')

In [None]:
df_parquet.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [None]:
df_parquet.show(3, truncate=False)

+-----------------+---------------------+--------+---------------+
|order_customer_id|order_date           |order_id|order_status   |
+-----------------+---------------------+--------+---------------+
|1                |2013-07-25 00:00:00.0|11599   |CLOSED         |
|2                |2013-07-25 00:00:00.0|256     |PENDING_PAYMENT|
|3                |2013-07-25 00:00:00.0|12111   |COMPLETE       |
+-----------------+---------------------+--------+---------------+
only showing top 3 rows



##### Convert csv data to pipe separated files 

In [None]:
input_csv_files = '/content/drive/MyDrive/retail_db-master/csv_files/'
output_pipe_files = '/content/drive/MyDrive/retail_db-master/pipe_files/'

In [None]:
for folder in os.listdir(input_csv_files):
  if '.sql' not in folder:
    path = input_csv_files + f'{folder}'
    df = spark.read.csv(path)
    print(f'Converting data in {path} from csv to pipe separated file')
    # coalesce() will return only 1 file in the target folder
    df.coalesce(1).write.mode("overwrite").csv(output_pipe_files + f'{folder}/', sep='|')

Converting data in /content/drive/MyDrive/retail_db-master/csv_files/orders from csv to pipe separated file
Converting data in /content/drive/MyDrive/retail_db-master/csv_files/categories from csv to pipe separated file
Converting data in /content/drive/MyDrive/retail_db-master/csv_files/customers from csv to pipe separated file
Converting data in /content/drive/MyDrive/retail_db-master/csv_files/products from csv to pipe separated file
Converting data in /content/drive/MyDrive/retail_db-master/csv_files/order_items from csv to pipe separated file
Converting data in /content/drive/MyDrive/retail_db-master/csv_files/departments from csv to pipe separated file


In [None]:
df_pipe = spark.read.csv('/content/drive/MyDrive/retail_db-master/pipe_files/orders',sep='|', inferSchema=True).toDF('order_id',
                                                                                                                     'order_date',
                                                                                                                     'order_customer_id',
                                                                                                                     'order_status')

In [None]:
# other way to pass options 
spark.read. \
          options(sep='|', inferSchema=True). \
          csv('/content/drive/MyDrive/retail_db-master/pipe_files/orders'). \
          toDF('order_id','order_date','order_customer_id','order_status').show(3)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
+--------+--------------------+-----------------+---------------+
only showing top 3 rows



In [None]:
df_pipe.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [None]:
df_pipe.show(3, truncate=False)

+--------+---------------------+-----------------+---------------+
|order_id|order_date           |order_customer_id|order_status   |
+--------+---------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00.0|11599            |CLOSED         |
|2       |2013-07-25 00:00:00.0|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00.0|12111            |COMPLETE       |
+--------+---------------------+-----------------+---------------+
only showing top 3 rows



#### Partitioning Spark dataframes

In [None]:
df_pipe_2 = df_pipe.alias('copy')

df_pipe_2 = df_pipe_2.withColumn('year', year('order_date')). \
                      withColumn('month', month('order_date'))

In [None]:
# This way the dataframe is partitioned by multiple columns
df_pipe_2.coalesce(1). \
          write. \
          mode('overwrite'). \
          partitionBy('year','month'). \
          parquet('/content/drive/MyDrive/retail_db-master/partitioned_files/')

In [None]:
! ls /content/drive/MyDrive/retail_db-master/partitioned_files/

 _SUCCESS  'year=2013'	'year=2014'


In [None]:
! ls /content/drive/MyDrive/retail_db-master/partitioned_files/year=2013

'month=10'  'month=11'	'month=12'  'month=7'  'month=8'  'month=9'


In [None]:
# other way to partiton by month could be the next one. This way is partitioned by one column
df_pipe_2.drop('year','month'). \
          withColumn('order_date', date_format('order_date', 'yyyyMM')). \
          coalesce(1). \
          write. \
          partitionBy('order_date'). \
          parquet('/content/drive/MyDrive/retail_db-master/partitioned_files/')

In [None]:
# check if partitioned dataframe is read from directory ok
df_pipe_2.count() == spark.read.parquet('/content/drive/MyDrive/retail_db-master/partitioned_files/').count()

True

#### Spark SQL functions

##### User defined functions

In [None]:
dc = spark.udf.register('date_convert', lambda x: int(x[:10].replace('-','')))

In [None]:
df_pipe.withColumn('date_converted', dc('order_date')).show(5)

+--------+--------------------+-----------------+---------------+--------------+
|order_id|          order_date|order_customer_id|   order_status|date_converted|
+--------+--------------------+-----------------+---------------+--------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|      20130725|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|      20130725|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|      20130725|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|      20130725|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|      20130725|
+--------+--------------------+-----------------+---------------+--------------+
only showing top 5 rows



In [None]:
df_pipe.where(dc('order_date') != 20130725).show(5)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|     105|2013-07-26 00:00:...|             8220|       COMPLETE|
|     106|2013-07-26 00:00:...|              395|     PROCESSING|
|     107|2013-07-26 00:00:...|             1845|       COMPLETE|
|     108|2013-07-26 00:00:...|            12149|     PROCESSING|
|     109|2013-07-26 00:00:...|             9345|PENDING_PAYMENT|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [None]:
df_pipe.groupBy(dc('order_date').alias('order_date')). \
        count(). \
        withColumnRenamed('count','order_count'). \
        orderBy(col('order_date').asc()). \
        show(10)

+----------+-----------+
|order_date|order_count|
+----------+-----------+
|  20130725|        143|
|  20130726|        269|
|  20130727|        202|
|  20130728|        187|
|  20130729|        253|
|  20130730|        227|
|  20130731|        252|
|  20130801|        246|
|  20130802|        224|
|  20130803|        183|
+----------+-----------+
only showing top 10 rows



##### Using Spark UDFs as part of Spark SQL

In [None]:
df_pipe.selectExpr('date_convert(order_date) AS order_date').show(5)

+----------+
|order_date|
+----------+
|  20130725|
|  20130725|
|  20130725|
|  20130725|
|  20130725|
+----------+
only showing top 5 rows



In [None]:
df_pipe.createOrReplaceTempView('orders')

In [None]:
spark.sql('''
          SELECT *,
                date_convert(order_date) converted_date
          FROM orders
          WHERE date_convert(order_date) != 20130725;
          ''').show(5, truncate=False)

+--------+---------------------+-----------------+---------------+--------------+
|order_id|order_date           |order_customer_id|order_status   |converted_date|
+--------+---------------------+-----------------+---------------+--------------+
|105     |2013-07-26 00:00:00.0|8220             |COMPLETE       |20130726      |
|106     |2013-07-26 00:00:00.0|395              |PROCESSING     |20130726      |
|107     |2013-07-26 00:00:00.0|1845             |COMPLETE       |20130726      |
|108     |2013-07-26 00:00:00.0|12149            |PROCESSING     |20130726      |
|109     |2013-07-26 00:00:00.0|9345             |PENDING_PAYMENT|20130726      |
+--------+---------------------+-----------------+---------------+--------------+
only showing top 5 rows



In [None]:
spark.sql('''
          SELECT date_convert(order_date) converted_date, COUNT(*) order_counts
          FROM orders
          GROUP BY 1
          HAVING (converted_date > 20131231) AND (order_counts > 280)
          ORDER BY 2 DESC;
          ''').show(10, truncate=False)

+--------------+------------+
|converted_date|order_counts|
+--------------+------------+
|20140720      |285         |
|20140111      |281         |
+--------------+------------+



##### Cleansing data using Spark UDFs

In [None]:
courses_data = {'course_id':['1','2','3','4','5'],
                'course_name':['Mastering SQL','Streaming pipelines','Head First Pyhton',
                               'Designing Data Intensive Applications','Distributed Systems'],
                'course_status':['    published    ','   inactive ',' \\N  ', '   published','\\N']}

In [None]:
courses_df_ = spark.createDataFrame(pd.DataFrame(courses_data))

In [None]:
courses_df_.show()

+---------+--------------------+-----------------+
|course_id|         course_name|    course_status|
+---------+--------------------+-----------------+
|        1|       Mastering SQL|    published    |
|        2| Streaming pipelines|        inactive |
|        3|   Head First Pyhton|             \N  |
|        4|Designing Data In...|        published|
|        5| Distributed Systems|               \N|
+---------+--------------------+-----------------+



In [None]:
clean_course = spark.udf.register('cleaned_course', lambda x: x.strip() if x.strip() != '\\N' else None)

In [None]:
courses_df_.select('*', clean_course('course_status').alias('cleaned_course_status')).show(5)

+---------+--------------------+-----------------+---------------------+
|course_id|         course_name|    course_status|cleaned_course_status|
+---------+--------------------+-----------------+---------------------+
|        1|       Mastering SQL|    published    |            published|
|        2| Streaming pipelines|        inactive |             inactive|
|        3|   Head First Pyhton|             \N  |                 null|
|        4|Designing Data In...|        published|            published|
|        5| Distributed Systems|               \N|                 null|
+---------+--------------------+-----------------+---------------------+



In [None]:
courses_df_.createOrReplaceTempView('course_temp_view')

In [None]:
spark.sql('''
          SELECT *, cleaned_course(course_status) as cleaned_course
          FROM course_temp_view
          ''').show()

+---------+--------------------+-----------------+--------------+
|course_id|         course_name|    course_status|cleaned_course|
+---------+--------------------+-----------------+--------------+
|        1|       Mastering SQL|    published    |     published|
|        2| Streaming pipelines|        inactive |      inactive|
|        3|   Head First Pyhton|             \N  |          null|
|        4|Designing Data In...|        published|     published|
|        5| Distributed Systems|               \N|          null|
+---------+--------------------+-----------------+--------------+



In [None]:
spark.sql('''
          SELECT *, coalesce(cleaned_course(course_status), "not cleaned") as cleaned_course
          FROM course_temp_view
          ''').show()

+---------+--------------------+-----------------+--------------+
|course_id|         course_name|    course_status|cleaned_course|
+---------+--------------------+-----------------+--------------+
|        1|       Mastering SQL|    published    |     published|
|        2| Streaming pipelines|        inactive |      inactive|
|        3|   Head First Pyhton|             \N  |   not cleaned|
|        4|Designing Data In...|        published|     published|
|        5| Distributed Systems|               \N|   not cleaned|
+---------+--------------------+-----------------+--------------+

