### PySpark DataFrames: a Comprehensive Guide

In [0]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, BooleanType
import pyspark.sql.types as T
from pyspark.sql import functions as F
from pyspark.sql.functions import col, column
from pyspark.sql.window import Window

#### Dataframes

##### Row objects

In [0]:
row = Row(name="John", role="Data scientist", salary=4500)
print("Name: ", row.name)
print("Salary: ", row['role'])
print("Salary: ", row[2])

Name:  John
Salary:  Data scientist
Salary:  4500


In [0]:
row = Row(name="John", role="Data scientist", salary=None)

In [0]:
row = Row("John", "Data scientist", 4500)
row[2]

Out[4]: 4500

In [0]:
Employee = Row("name", "role", "salary")
employee1 = Employee("John", "Data scientist", 4500)
print("Name: {}, Salary: {}".format(employee1.name, employee1.salary))

Name: John, Salary: 4500


##### Schema

In [0]:
someSchema = StructType([
  StructField("name", StringType(), True),
  StructField("role", StringType(), True),
  StructField("salary", IntegerType(), True)]
)
someSchema

Out[6]: StructType([StructField('name', StringType(), True), StructField('role', StringType(), True), StructField('salary', IntegerType(), True)])

In [0]:
ddl_schema = "`name` STRING, `role` STRING, `salary` INTEGER"
schema = T._parse_datatype_string(ddl_schema)
schema

Out[7]: StructType([StructField('name', StringType(), True), StructField('role', StringType(), True), StructField('salary', IntegerType(), True)])

##### Creating DataFrames

In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data)
df.show()

+-----+--------------+----+
|   _1|            _2|  _3|
+-----+--------------+----+
| John|Data scientist|4500|
|James| Data engineer|3200|
|Laura|Data scientist|4100|
|  Ali| Data engineer|3200|
|Steve|     Developer|3600|
+-----+--------------+----+



In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
cols = ["name","role", "salary"]
df = spark.createDataFrame(data).toDF(*cols)
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]

schema = StructType([
  StructField("name", StringType(), True),
  StructField("role", StringType(), True),
  StructField("salary", IntegerType(), True)]
)

df = spark.createDataFrame(data=data,schema=schema)
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]

ddl_schema = "`name` STRING, `role` STRING, `salary` INTEGER"

df = spark.createDataFrame(
  data,
  T._parse_datatype_string(ddl_schema)
)

df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
data =[ 
  Row(name="John", role="Data scientist", salary=4500),
  Row(name="James", role="Data engineer", salary=3200),
  Row(name="Laura", role="Data scientist", salary=4100),
  Row(name="Ali", role="Data engineer", salary=3200),
  Row(name="Steve", role="Developer", salary=3600)
]

df = spark.createDataFrame(data)
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
data =[ 
  Row("John", "Data scientist", 4500),
  Row("James", "Data engineer", 3200),
  Row("Laura", "Data scientist", 4100),
  Row("Ali", "Data engineer", 3200),
  Row("Steve", "Developer", 3600)
]

df = spark.createDataFrame(data).toDF("name", "role", "salary")
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
Employee = Row("name", "role", "salary")

data = [
  Employee("John", "Data scientist", 4500),
  Employee("James", "Data engineer", 3200),
  Employee("Laura", "Data scientist", 4100),
  Employee("Ali", "Data engineer", 3200),
  Employee("Steve", "Developer", 3600)
  ]

df = spark.createDataFrame(data)
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(["name","role", "salary"])
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
# createDataFrame() a list of Row/tuple/list/dict* or pandas.DataFrame, unless 
# schema with DataType is provided. So, this doesn't run

# data = [1,2,3,4,5]
# df1 = spark.createDataFrame(data).toDF("value")

In [0]:
data = [1,2,3,4,5]
df1 = spark.createDataFrame([(i,) for i in data]).toDF("value")
df1.show()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+



In [0]:
data = [1,2,3,4,5]
df1 = spark.createDataFrame(data, T.IntegerType())
df1.show()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+



In [0]:
# This doesn't run
# data = [1,2,3,4.0,5]
# df1 = spark.createDataFrame(data, T.IntegerType())
# df1.show()

In [0]:
data = [1,2,3,4.0,5]
df1 = spark.createDataFrame(map(float, data), T.FloatType())
df1.show()

+-----+
|value|
+-----+
|  1.0|
|  2.0|
|  3.0|
|  4.0|
|  5.0|
+-----+



##### Empty DataFrames

In [0]:
# Create empty an dataframe
df1 = spark.createDataFrame([], T.StructType([]))
df1.show()

++
||
++
++



In [0]:
ddl_schema = "`col1` INTEGER, `col2` INTEGER"
schema = T._parse_datatype_string(ddl_schema)
df1 = spark.createDataFrame([], schema)
df1.show()

+----+----+
|col1|col2|
+----+----+
+----+----+



##### Null values

In [0]:
data = [
  ("John", None, None),
  ("James", "Data engineer", 3200),
  ("Laura", None, 4100),
  (None, "Data engineer", 3200),
  ("Steve", "Developer", 3600)
  ]

df1 = spark.createDataFrame(data).toDF("name", "role", "salary")
df1.show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
| John|         null|  null|
|James|Data engineer|  3200|
|Laura|         null|  4100|
| null|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
import pyspark.sql.types as T
data =[ 
  ("John", "Data scientist", None),
  ("James", "Data engineer", None),
  ("Laura", "Data scientist", None),
  ("Ali", "Data engineer", None),
  ("Steve", "Developer", None)
]

ddl_schema = "`name` STRING, `role` STRING, `salary` FLOAT"
df = spark.createDataFrame(
  data,
  T._parse_datatype_string(ddl_schema)
)
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  null|
|James| Data engineer|  null|
|Laura|Data scientist|  null|
|  Ali| Data engineer|  null|
|Steve|     Developer|  null|
+-----+--------------+------+



##### Dispalying DataFrames

In [0]:
df.show(3)

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  null|
|James| Data engineer|  null|
|Laura|Data scientist|  null|
+-----+--------------+------+
only showing top 3 rows



In [0]:
df.show(n=3, vertical=True)

-RECORD 0----------------
 name   | John           
 role   | Data scientist 
 salary | null           
-RECORD 1----------------
 name   | James          
 role   | Data engineer  
 salary | null           
-RECORD 2----------------
 name   | Laura          
 role   | Data scientist 
 salary | null           
only showing top 3 rows



In [0]:
display(df)

name,role,salary
John,Data scientist,
James,Data engineer,
Laura,Data scientist,
Ali,Data engineer,
Steve,Developer,


#### Showing the schema

In [0]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- salary: float (nullable = true)



In [0]:
df.schema

Out[30]: StructType([StructField('name', StringType(), True), StructField('role', StringType(), True), StructField('salary', FloatType(), True)])

#### Shape of DataFrame

In [0]:
df.count()

Out[31]: 5

In [0]:
df.columns

Out[32]: ['name', 'role', 'salary']

In [0]:
len(df.columns)

Out[33]: 3

In [0]:
# Check if a DataFrame is empty
df1 = spark.createDataFrame([], T.StructType([]))
df1.count() == 0

Out[34]: True

In [0]:
# Check if a DataFrame is empty
df1.isEmpty()

Out[35]: True

#### Displaying the rows 

In [0]:
df.head(3) 

Out[36]: [Row(name='John', role='Data scientist', salary=None),
 Row(name='James', role='Data engineer', salary=None),
 Row(name='Laura', role='Data scientist', salary=None)]

In [0]:
df.take(3)

Out[37]: [Row(name='John', role='Data scientist', salary=None),
 Row(name='James', role='Data engineer', salary=None),
 Row(name='Laura', role='Data scientist', salary=None)]

In [0]:
df.first()

Out[38]: Row(name='John', role='Data scientist', salary=None)

In [0]:
df.tail(2)

Out[39]: [Row(name='Ali', role='Data engineer', salary=None),
 Row(name='Steve', role='Developer', salary=None)]

In [0]:
df.limit(3).show() 

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  null|
|James| Data engineer|  null|
|Laura|Data scientist|  null|
+-----+--------------+------+



#### Calculating the statistics

In [0]:
df.describe().show()

+-------+-----+-------------+------+
|summary| name|         role|salary|
+-------+-----+-------------+------+
|  count|    5|            5|     0|
|   mean| null|         null|  null|
| stddev| null|         null|  null|
|    min|  Ali|Data engineer|  null|
|    max|Steve|    Developer|  null|
+-------+-----+-------------+------+



#### Columns

In [0]:
col1 = col("c1")
col2 = column("c2")
col1

Out[42]: Column<'c1'>

In [0]:
df.columns

Out[43]: ['name', 'role', 'salary']

In [0]:
df.name

Out[44]: Column<'name'>

In [0]:
df["name"]

Out[45]: Column<'name'>

##### Selecting columns

In [0]:
df.select("name").show()

+-----+
| name|
+-----+
| John|
|James|
|Laura|
|  Ali|
|Steve|
+-----+



In [0]:
df.select(col("name")).show()

+-----+
| name|
+-----+
| John|
|James|
|Laura|
|  Ali|
|Steve|
+-----+



In [0]:
df.select(df["name"]).show()

+-----+
| name|
+-----+
| John|
|James|
|Laura|
|  Ali|
|Steve|
+-----+



In [0]:
df.select(["name", "role"]).show()

+-----+--------------+
| name|          role|
+-----+--------------+
| John|Data scientist|
|James| Data engineer|
|Laura|Data scientist|
|  Ali| Data engineer|
|Steve|     Developer|
+-----+--------------+



In [0]:
df.select("name", "role").show()

+-----+--------------+
| name|          role|
+-----+--------------+
| John|Data scientist|
|James| Data engineer|
|Laura|Data scientist|
|  Ali| Data engineer|
|Steve|     Developer|
+-----+--------------+



In [0]:
df.select(col("*")).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  null|
|James| Data engineer|  null|
|Laura|Data scientist|  null|
|  Ali| Data engineer|  null|
|Steve|     Developer|  null|
+-----+--------------+------+



In [0]:
df.select(df.columns[:2]).show()

+-----+--------------+
| name|          role|
+-----+--------------+
| John|Data scientist|
|James| Data engineer|
|Laura|Data scientist|
|  Ali| Data engineer|
|Steve|     Developer|
+-----+--------------+



In [0]:
spark.range(1, 6).select(F.col("id").cast("double")).show()

+---+
| id|
+---+
|1.0|
|2.0|
|3.0|
|4.0|
|5.0|
+---+



##### Column expressions

In [0]:
df.select(col("salary") * 1.2 + 100).show()

+----------------------+
|((salary * 1.2) + 100)|
+----------------------+
|                  null|
|                  null|
|                  null|
|                  null|
|                  null|
+----------------------+



In [0]:
df.select(df["salary"] * 1.2 + 100).show()

+----------------------+
|((salary * 1.2) + 100)|
+----------------------+
|                  null|
|                  null|
|                  null|
|                  null|
|                  null|
+----------------------+



In [0]:
df.select(["name", df["salary"] * 1.2 + 100]).show()

+-----+----------------------+
| name|((salary * 1.2) + 100)|
+-----+----------------------+
| John|                  null|
|James|                  null|
|Laura|                  null|
|  Ali|                  null|
|Steve|                  null|
+-----+----------------------+



In [0]:
df.select(df["salary"] > 4000).show()

+---------------+
|(salary > 4000)|
+---------------+
|           null|
|           null|
|           null|
|           null|
|           null|
+---------------+



In [0]:
df.select(col("salary").between(3200,3600)).show()

+---------------------------------------+
|((salary >= 3200) AND (salary <= 3600))|
+---------------------------------------+
|                                   null|
|                                   null|
|                                   null|
|                                   null|
|                                   null|
+---------------------------------------+



In [0]:
df.select(F.concat(df["name"], F.lit("-"), df["role"])).show()

+---------------------+
|concat(name, -, role)|
+---------------------+
|  John-Data scientist|
|  James-Data engineer|
| Laura-Data scientist|
|    Ali-Data engineer|
|      Steve-Developer|
+---------------------+



##### Column aliases

In [0]:
col_expr = (col("salary") * 1.2 + 100).alias("bonus")

In [0]:
df.select(col_expr).show()

+-----+
|bonus|
+-----+
| null|
| null|
| null|
| null|
| null|
+-----+



In [0]:
df.select((col("salary") * 1.2 + 100).name("bonus")).show()

+-----+
|bonus|
+-----+
| null|
| null|
| null|
| null|
| null|
+-----+



In [0]:
df.select(F.expr("salary > 4000")).show()

+---------------+
|(salary > 4000)|
+---------------+
|           null|
|           null|
|           null|
|           null|
|           null|
+---------------+



##### String manipulation

In [0]:
df.select(F.lower(df['name']).alias('name')).show()

+-----+
| name|
+-----+
| john|
|james|
|laura|
|  ali|
|steve|
+-----+



##### Math functions

In [0]:
data = [1.0,2.0,3.0,4.0,5.0]
df1 = spark.createDataFrame(data, FloatType()).toDF("number")
df1.select(F.round(F.log("number"), 3)).show()

+--------------------+
|round(ln(number), 3)|
+--------------------+
|                 0.0|
|               0.693|
|               1.099|
|               1.386|
|               1.609|
+--------------------+



In [0]:
data = [0.6892, 206.892, 1268.0]
df1 = spark.createDataFrame([(i,) for i in data]).toDF("number")
df1.select(F.round(col("number"), -2)).show()

+-----------------+
|round(number, -2)|
+-----------------+
|              0.0|
|            200.0|
|           1300.0|
+-----------------+



##### Creating new columns

In [0]:
df1 = df.withColumn("vacation", F.lit(15))
df1.show()

+-----+--------------+------+--------+
| name|          role|salary|vacation|
+-----+--------------+------+--------+
| John|Data scientist|  null|      15|
|James| Data engineer|  null|      15|
|Laura|Data scientist|  null|      15|
|  Ali| Data engineer|  null|      15|
|Steve|     Developer|  null|      15|
+-----+--------------+------+--------+



In [0]:
df1 = df.withColumn("salary", col("salary") + 300)
df1.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  null|
|James| Data engineer|  null|
|Laura|Data scientist|  null|
|  Ali| Data engineer|  null|
|Steve|     Developer|  null|
+-----+--------------+------+



In [0]:
df1 = df.select(col("*"), (col("salary") * 1.2 + 100).name("bonus"))
df1.show()

+-----+--------------+------+-----+
| name|          role|salary|bonus|
+-----+--------------+------+-----+
| John|Data scientist|  null| null|
|James| Data engineer|  null| null|
|Laura|Data scientist|  null| null|
|  Ali| Data engineer|  null| null|
|Steve|     Developer|  null| null|
+-----+--------------+------+-----+



In [0]:
df1 = df.withColumns({"bonus1": col("salary") + 100, "bonus2": col("salary") + 300})
df1.show()

+-----+--------------+------+------+------+
| name|          role|salary|bonus1|bonus2|
+-----+--------------+------+------+------+
| John|Data scientist|  null|  null|  null|
|James| Data engineer|  null|  null|  null|
|Laura|Data scientist|  null|  null|  null|
|  Ali| Data engineer|  null|  null|  null|
|Steve|     Developer|  null|  null|  null|
+-----+--------------+------+------+------+



##### Renaming Columns

In [0]:
df1 =df.withColumnRenamed("name", "first name").withColumnRenamed("role", "job")
df1.show()

+----------+--------------+------+
|first name|           job|salary|
+----------+--------------+------+
|      John|Data scientist|  null|
|     James| Data engineer|  null|
|     Laura|Data scientist|  null|
|       Ali| Data engineer|  null|
|     Steve|     Developer|  null|
+----------+--------------+------+



In [0]:
# Only in Spark version 3.4.0 or later
# df1 =df.withColumnsRenamed({"name": "first name", "role": "job"})
# df1.show()

##### Changing the data type of columns

In [0]:
data = [
  ("2020-01-01 07:30:10.150007", "17.0"), 
  ("2020-01-02 07:30:10.150007", "25.5"),  
  ("2020-01-03 07:30:10.150007", "19.5"),  
  ("2020-01-04 07:30:10.150007", "21.2"),  
  ("2020-01-05 07:30:10.150007", "18.0"), 
  ("2020-01-06 07:30:10.150007", "20.5")
  ]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("temperature", col("temperature").cast(DoubleType()))
df1.show(truncate=False)

+--------------------------+-----------+
|time                      |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0       |
|2020-01-02 07:30:10.150007|25.5       |
|2020-01-03 07:30:10.150007|19.5       |
|2020-01-04 07:30:10.150007|21.2       |
|2020-01-05 07:30:10.150007|18.0       |
|2020-01-06 07:30:10.150007|20.5       |
+--------------------------+-----------+



In [0]:
data = [
  ("2020-01-01 07:30:10.150007", "17.0"), 
  ("2020-01-02 07:30:10.150007", "25.5"),  
  ("2020-01-03 07:30:10.150007", "19.5"),  
  ("2020-01-04 07:30:10.150007", "21.2"),  
  ("2020-01-05 07:30:10.150007", "18.0"), 
  ("2020-01-06 07:30:10.150007", "20.5")
  ]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("temperature", col("temperature").cast("double"))
df1.show(truncate=False)

+--------------------------+-----------+
|time                      |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0       |
|2020-01-02 07:30:10.150007|25.5       |
|2020-01-03 07:30:10.150007|19.5       |
|2020-01-04 07:30:10.150007|21.2       |
|2020-01-05 07:30:10.150007|18.0       |
|2020-01-06 07:30:10.150007|20.5       |
+--------------------------+-----------+



In [0]:
data =[ 
  ("1.0", 2),
  ("2.0", 4),
  ("3.0", 6),
  ("4.0", 8)
]
df1 = spark.createDataFrame(data).toDF("col1","col2")
df1.show()

+----+----+
|col1|col2|
+----+----+
| 1.0|   2|
| 2.0|   4|
| 3.0|   6|
| 4.0|   8|
+----+----+



In [0]:
df1.select((df1["col1"] >= 2).cast('int')).show()

+------------------------+
|CAST((col1 >= 2) AS INT)|
+------------------------+
|                       0|
|                       1|
|                       1|
|                       1|
+------------------------+



In [0]:
# Automatic conversion from string to float
df1.select(df1["col1"]+df1["col2"]).show()

+-------------+
|(col1 + col2)|
+-------------+
|          3.0|
|          6.0|
|          9.0|
|         12.0|
+-------------+



##### Dropping columns

In [0]:
df.drop("role", "salary").show()

+-----+
| name|
+-----+
| John|
|James|
|Laura|
|  Ali|
|Steve|
+-----+



In [0]:
cols = ["role", "salary"]
df.drop(*cols).show()

+-----+
| name|
+-----+
| John|
|James|
|Laura|
|  Ali|
|Steve|
+-----+



##### Dropping duplicates

In [0]:
data = [
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600),
  ("John", "Data scientist", 4500)
  ]

df1 = spark.createDataFrame(data).toDF("name", "role", "salary")
df1.dropDuplicates(["role"]).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
|James| Data engineer|  3200|
| John|Data scientist|  4500|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
df1.dropDuplicates().show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
df1.select("role").distinct().show()

+--------------+
|          role|
+--------------+
|Data scientist|
| Data engineer|
|     Developer|
+--------------+



##### Splitting a column 

In [0]:
data = [
  ("John,Data scientist", 4500),
  ("James,Data engineer", 3200),
  ("Laura,Data scientist", 4100)
]
df1 = spark.createDataFrame(data).toDF("name-role", "salary")
df1.select(F.split(col("name-role"), ",") \
   .alias("splitted_column")).show(truncate=False)

+-----------------------+
|splitted_column        |
+-----------------------+
|[John, Data scientist] |
|[James, Data engineer] |
|[Laura, Data scientist]|
+-----------------------+



In [0]:
split_df = df1.select(F.split(col("name-role"), ",").alias("splitted_column"), col("salary"))
split_df.select(s
            col("splitted_column")[0].alias("name"),
            col("splitted_column")[1].alias("role"),
            col("salary")).show()

[0;36m  File [0;32m"<command-4080278363775537>"[0;36m, line [0;32m3[0m
[0;31m    col("splitted_column")[0].alias("name"),[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
split_df.select([col("splitted_column")[i].alias("col{}".format(i+1)) for i in range(2)]).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-4080278363775538>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0msplit_df[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0;34m[[0m[0mcol[0m[0;34m([0m[0;34m"splitted_column"[0m[0;34m)[0m[0;34m[[0m[0mi[0m[0;34m][0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m"col{}"[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0mi[0m[0;34m+[0m[0;36m1[0m[0;34m)[0m[0;34m)[0m [0;32mfor[0m [0mi[0m [0;32min[0m [0mrange[0m[0;34m([0m[0;36m2[0m[0;34m)[0m[0;34m][0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'split_df' is not defined

#### Rows

##### Filtering the rows

In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")

df.filter(col("salary") == 3200).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
|  Ali|Data engineer|  3200|
+-----+-------------+------+



In [0]:
df.where(col("salary") == 3200).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
|  Ali|Data engineer|  3200|
+-----+-------------+------+



In [0]:
df.filter((col("salary") > 3200) & (col("name") != "Ali")).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|Laura|Data scientist|  4100|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
df.filter("salary > 3200 and name != 'Ali'").show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|Laura|Data scientist|  4100|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
df.filter(col("salary").between(3200,3600)).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
|  Ali|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



##### when() and otherwise()

In [0]:
df.select(F.when(df["role"] == "Data scientist", 3) \
           .alias("role code")).show()

+---------+
|role code|
+---------+
|        3|
|     null|
|        3|
|     null|
|     null|
+---------+



In [0]:
df.select(F.when(df["role"] == "Data scientist", 3) \
           .when(df["role"] == "Data engineer", 2)   \
           .otherwise(1).alias("role code")).show()

+---------+
|role code|
+---------+
|        3|
|        2|
|        3|
|        2|
|        1|
+---------+



In [0]:
df.withColumn("salary",
              F.when(df["role"] == "Data scientist", df["salary"] + 50) \
               .otherwise(df["salary"])).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4550|
|James| Data engineer|  3200|
|Laura|Data scientist|  4150|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



##### Methods for missing values

In [0]:
data = [
  ("John", None, None),
  ("James", "Data engineer", 3200),
  ("Laura", None, 4100),
  (None, "Data engineer", 3200),
  ("Steve", "Developer", 3600)
  ]

df1 = spark.createDataFrame(data).toDF("name", "role", "salary")
df1.show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
| John|         null|  null|
|James|Data engineer|  3200|
|Laura|         null|  4100|
| null|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
df1.filter(df1["role"].isNull()).show()

+-----+----+------+
| name|role|salary|
+-----+----+------+
| John|null|  null|
|Laura|null|  4100|
+-----+----+------+



In [0]:
df1.filter(F.expr("role IS NULL")).show()

+-----+----+------+
| name|role|salary|
+-----+----+------+
| John|null|  null|
|Laura|null|  4100|
+-----+----+------+



In [0]:
df1.filter(df1["role"].isNotNull()).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
| null|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
df1.na

Out[100]: <pyspark.sql.dataframe.DataFrameNaFunctions at 0x7fe9d45342b0>

In [0]:
df1.na.fill("N/A").show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
| John|          N/A|  null|
|James|Data engineer|  3200|
|Laura|          N/A|  4100|
|  N/A|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
df1.na.fill({"name": "N/A", "role": "N/A", "salary": 0}).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
| John|          N/A|     0|
|James|Data engineer|  3200|
|Laura|          N/A|  4100|
|  N/A|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
df1.na.drop().show() 

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
df1.na.drop(how="all", subset=["role", "salary"]).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
|Laura|         null|  4100|
| null|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



In [0]:
df1.dropna(how="all", subset=["role", "salary"]).show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|James|Data engineer|  3200|
|Laura|         null|  4100|
| null|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



#### User Defined Functions (UDF)

In [0]:
df1 = spark.range(1, 11).select(F.col("id").cast("int").alias("number"))
df1.show()

+------+
|number|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
+------+



In [0]:
≈
# the default return type is string
def is_even(x):
    return x % 2 == 0
is_evenUDF = F.udf(lambda x: is_even(x))
df2 = df1.withColumn("even", is_evenUDF(F.col("number")))
df2.show()

[0;36m  File [0;32m"<command-3138747455353760>"[0;36m, line [0;32m1[0m
[0;31m    ≈[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid character '≈' (U+2248)


In [0]:
# You cannot use UDFs in a shared cluster
def is_even(x):
    return x % 2 == 0
is_evenUDF = F.udf(lambda x: is_even(x), BooleanType())
df2 = df1.withColumn("even", is_evenUDF(df1["number"]))
df2.show()

+------+-----+
|number| even|
+------+-----+
|     1|false|
|     2| true|
|     3|false|
|     4| true|
|     5|false|
|     6| true|
|     7|false|
|     8| true|
|     9|false|
|    10| true|
+------+-----+



In [0]:
df2 = df1.select(col("*"), is_evenUDF(col("number")).alias("even"))
df2.show()

+------+-----+
|number| even|
+------+-----+
|     1|false|
|     2| true|
|     3|false|
|     4| true|
|     5|false|
|     6| true|
|     7|false|
|     8| true|
|     9|false|
|    10| true|
+------+-----+



In [0]:
@F.udf(returnType=BooleanType())
def is_even(x):
    return x % 2 == 0

df2 = df1.withColumn("even", is_even(df1["number"]))
df2.show()

+------+-----+
|number| even|
+------+-----+
|     1|false|
|     2| true|
|     3|false|
|     4| true|
|     5|false|
|     6| true|
|     7|false|
|     8| true|
|     9|false|
|    10| true|
+------+-----+



In [0]:
@F.udf()
def func(name, role):
    return name+" "+role

df2 = df.withColumn("name_role", func(col("name"), col("role")))
df2.show()

+-----+--------------+------+--------------------+
| name|          role|salary|           name_role|
+-----+--------------+------+--------------------+
| John|Data scientist|  4500| John Data scientist|
|James| Data engineer|  3200| James Data engineer|
|Laura|Data scientist|  4100|Laura Data scientist|
|  Ali| Data engineer|  3200|   Ali Data engineer|
|Steve|     Developer|  3600|     Steve Developer|
+-----+--------------+------+--------------------+



##### UDFs with non-Column parameters

In [0]:
def add_const(column, number):
    return column+number

# It cannot be like this:
# def add_const_udf(col, number):
#     return F.udf(lambda col, number: add_const(col, number), T.IntegerType())
# the udf function should be a lambda func since they are curreid

add_const_udf = F.udf(lambda column, number: add_const(column, number), T.IntegerType())
df2 = df.withColumn("salary", add_const_udf(F.col("salary"), F.lit(200)))
df2.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4700|
|James| Data engineer|  3400|
|Laura|Data scientist|  4300|
|  Ali| Data engineer|  3400|
|Steve|     Developer|  3800|
+-----+--------------+------+



In [0]:
# Everything that is passed to an UDF is interpreted as a column / column name. If you want to pass a literal you have two options:
# one is currying
def add_const(column, number):
    return column+number

def add_const_udf(number):
    return F.udf(lambda column: add_const(column, number), IntegerType())

df2 = df.withColumn("salary", add_const_udf(200)(col("salary")))
df2.show()


+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4700|
|James| Data engineer|  3400|
|Laura|Data scientist|  4300|
|  Ali| Data engineer|  3400|
|Steve|     Developer|  3800|
+-----+--------------+------+



##### UDFs and broadcasting

In [0]:
v = [1, 2, 3]
broadcast_v = spark.sparkContext.broadcast(v)
broadcast_v.value

Out[114]: [1, 2, 3]

In [0]:
job_codes = {"Data scientist": 0, "Data engineer": 1, "Developer": 2}
broadcast_job_codes = spark.sparkContext.broadcast(job_codes)

def get_job_code_udf(codes):
    return F.udf(lambda role: codes[role])

df1 = df.select(F.col("role"), get_job_code_udf(broadcast_job_codes.value)(col("role")).alias("job_code"))
df1.show()

+--------------+--------+
|          role|job_code|
+--------------+--------+
|Data scientist|       0|
| Data engineer|       1|
|Data scientist|       0|
| Data engineer|       1|
|     Developer|       2|
+--------------+--------+



##### Null values in UDFs

In [0]:
# This doesn't run 
# data = [1, 2, 3, 4, None]
# df1 = spark.createDataFrame(data, T.IntegerType()).toDF("number")

# is_evenUDF = F.udf(lambda x: x % 2 == 0, T.BooleanType())
# df2 = df1.withColumn("even", is_evenUDF(F.col("number")))
# df2.show()

In [0]:
data = [1, 2, 3, 4, None]
df1 = spark.createDataFrame(data, IntegerType()).toDF("number")
def is_even(x):
    return x % 2 == 0 if x else None
    
is_evenUDF = F.udf(lambda x: is_even(x), BooleanType())
df2 = df1.withColumn("even", is_evenUDF(F.col("number")))
df2.show()

+------+-----+
|number| even|
+------+-----+
|     1|false|
|     2| true|
|     3|false|
|     4| true|
|  null| null|
+------+-----+



#### Timestamp

In [0]:
data = [
  ("2020-01-01 07:30:10.150007", "17.0"), 
  ("2020-01-02 07:30:10.150007", "25.5"),  
  ("2020-01-03 07:30:10.150007", "19.5"),  
  ("2020-01-04 07:30:10.150007", "21.2"),  
  ("2020-01-05 07:30:10.150007", "18.0"), 
  ("2020-01-06 07:30:10.150007", "20.5")
  ]
df1 = spark.createDataFrame(data).toDF("time", "temperature")

df1 = df1.withColumn("time", col("time").cast("timestamp"))
df1.show(truncate=False)

+--------------------------+-----------+
|time                      |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0       |
|2020-01-02 07:30:10.150007|25.5       |
|2020-01-03 07:30:10.150007|19.5       |
|2020-01-04 07:30:10.150007|21.2       |
|2020-01-05 07:30:10.150007|18.0       |
|2020-01-06 07:30:10.150007|20.5       |
+--------------------------+-----------+



In [0]:
df2 = df1.withColumn("hour", F.date_trunc('hour', col('time'))) \
         .withColumn("minute", F.date_trunc('minute', col('time')))
df2.show(truncate=False)

+--------------------------+-----------+-------------------+-------------------+
|time                      |temperature|hour               |minute             |
+--------------------------+-----------+-------------------+-------------------+
|2020-01-01 07:30:10.150007|17.0       |2020-01-01 07:00:00|2020-01-01 07:30:00|
|2020-01-02 07:30:10.150007|25.5       |2020-01-02 07:00:00|2020-01-02 07:30:00|
|2020-01-03 07:30:10.150007|19.5       |2020-01-03 07:00:00|2020-01-03 07:30:00|
|2020-01-04 07:30:10.150007|21.2       |2020-01-04 07:00:00|2020-01-04 07:30:00|
|2020-01-05 07:30:10.150007|18.0       |2020-01-05 07:00:00|2020-01-05 07:30:00|
|2020-01-06 07:30:10.150007|20.5       |2020-01-06 07:00:00|2020-01-06 07:30:00|
+--------------------------+-----------+-------------------+-------------------+



In [0]:
# Round the time column to nearest hour
df2 = df1.withColumn('time1', F.date_trunc('hour', col('time') + F.expr('INTERVAL 30 MINUTES')))
df2.show(truncate=False)

+--------------------------+-----------+-------------------+
|time                      |temperature|time1              |
+--------------------------+-----------+-------------------+
|2020-01-01 07:30:10.150007|17.0       |2020-01-01 08:00:00|
|2020-01-02 07:30:10.150007|25.5       |2020-01-02 08:00:00|
|2020-01-03 07:30:10.150007|19.5       |2020-01-03 08:00:00|
|2020-01-04 07:30:10.150007|21.2       |2020-01-04 08:00:00|
|2020-01-05 07:30:10.150007|18.0       |2020-01-05 08:00:00|
|2020-01-06 07:30:10.150007|20.5       |2020-01-06 08:00:00|
+--------------------------+-----------+-------------------+



##### Extracting timestamp parts

In [0]:
df2 = df1.withColumn("year", F.year(df1["time"])) \
         .withColumn("month", F.month(df1["time"])) \
         .withColumn("day", F.dayofmonth(df1["time"])) \
         .withColumn("week", F.weekofyear(df1["time"])) \
         .withColumn("day of week", F.dayofweek(df1["time"])) \
         .withColumn("hour", F.hour(df1["time"])) \
         .withColumn("minute", F.minute(df1["time"])) \
         .withColumn("second", F.second(df1["time"])) 

df2.show(truncate=False)

+--------------------------+-----------+----+-----+---+----+-----------+----+------+------+
|time                      |temperature|year|month|day|week|day of week|hour|minute|second|
+--------------------------+-----------+----+-----+---+----+-----------+----+------+------+
|2020-01-01 07:30:10.150007|17.0       |2020|1    |1  |1   |4          |7   |30    |10    |
|2020-01-02 07:30:10.150007|25.5       |2020|1    |2  |1   |5          |7   |30    |10    |
|2020-01-03 07:30:10.150007|19.5       |2020|1    |3  |1   |6          |7   |30    |10    |
|2020-01-04 07:30:10.150007|21.2       |2020|1    |4  |1   |7          |7   |30    |10    |
|2020-01-05 07:30:10.150007|18.0       |2020|1    |5  |1   |1          |7   |30    |10    |
|2020-01-06 07:30:10.150007|20.5       |2020|1    |6  |2   |2          |7   |30    |10    |
+--------------------------+-----------+----+-----+---+----+-----------+----+------+------+



In [0]:
# Total time in seconds
df1.select(F.unix_timestamp(col('time'))).show()

+-----------------------------------------+
|unix_timestamp(time, yyyy-MM-dd HH:mm:ss)|
+-----------------------------------------+
|                               1577863810|
|                               1577950210|
|                               1578036610|
|                               1578123010|
|                               1578209410|
|                               1578295810|
+-----------------------------------------+



In [0]:
# Total time in seconds
df1.select(col('time').cast('long')).show()

+----------+
|      time|
+----------+
|1577863810|
|1577950210|
|1578036610|
|1578123010|
|1578209410|
|1578295810|
+----------+



In [0]:
microsec_str = F.when(F.split(df1['time'], '[.]').getItem(1).isNull() , 0) \
                .otherwise(F.split(df1['time'], '[.]').getItem(1))
df1.select((microsec_str/ 10**F.length(microsec_str)).alias("microseconds")).show()

+------------+
|microseconds|
+------------+
|    0.150007|
|    0.150007|
|    0.150007|
|    0.150007|
|    0.150007|
|    0.150007|
+------------+



##### Changing the time zone

In [0]:
# Assuming time is in UTC
df2 = df1.withColumn("time", F.from_utc_timestamp(df1["time"], "America/New_York"))
df2.show(truncate=False)

+--------------------------+-----------+
|time                      |temperature|
+--------------------------+-----------+
|2020-01-01 02:30:10.150007|17.0       |
|2020-01-02 02:30:10.150007|25.5       |
|2020-01-03 02:30:10.150007|19.5       |
|2020-01-04 02:30:10.150007|21.2       |
|2020-01-05 02:30:10.150007|18.0       |
|2020-01-06 02:30:10.150007|20.5       |
+--------------------------+-----------+



In [0]:
# Going back to UTC
df2 = df2.withColumn("time", F.to_utc_timestamp(df2["time"], "America/New_York"))
df2.show(truncate=False)

+--------------------------+-----------+
|time                      |temperature|
+--------------------------+-----------+
|2020-01-01 07:30:10.150007|17.0       |
|2020-01-02 07:30:10.150007|25.5       |
|2020-01-03 07:30:10.150007|19.5       |
|2020-01-04 07:30:10.150007|21.2       |
|2020-01-05 07:30:10.150007|18.0       |
|2020-01-06 07:30:10.150007|20.5       |
+--------------------------+-----------+



##### Creating a date range

In [0]:
start_dt='2018-01-01'
end_dt='2018-05-01'
query_str = "SELECT explode(sequence(to_date('{}'), to_date('{}'), interval 1 month)) as date".format(start_dt, end_dt)
df1 = spark.sql(query_str)
df1.show()

+----------+
|      date|
+----------+
|2018-01-01|
|2018-02-01|
|2018-03-01|
|2018-04-01|
|2018-05-01|
+----------+



#### Partitions

In [0]:
df1 = spark.range(1, 11).select(col("id").cast("int").alias("number"))
df1.rdd.getNumPartitions()

Out[128]: 8

In [0]:
df2 = df1.repartition(3)
df2.rdd.getNumPartitions()

Out[129]: 3

#### Adding an index

In [0]:
df1 = spark.range(1, 11).select(col("id").cast("int").alias("number"))
df2 = df1.repartition(3)
df2 = df2.withColumn("id", F.monotonically_increasing_id())
df2.show()

+------+-----------+
|number|         id|
+------+-----------+
|     5|          0|
|     6|          1|
|     7|          2|
|     9|          3|
|     1| 8589934592|
|     2| 8589934593|
|     4| 8589934594|
|     3|17179869184|
|     8|17179869185|
|    10|17179869186|
+------+-----------+



In [0]:
df1 = spark.range(1, 13).select(col("id").cast("int").alias("number")).coalesce(4)
df2 = df1.withColumn("id", F.monotonically_increasing_id())
df2.show()

+------+-----------+
|number|         id|
+------+-----------+
|     1|          0|
|     2|          1|
|     3|          2|
|     4| 8589934592|
|     5| 8589934593|
|     6| 8589934594|
|     7|17179869184|
|     8|17179869185|
|     9|17179869186|
|    10|25769803776|
|    11|25769803777|
|    12|25769803778|
+------+-----------+



In [0]:
df2 = df1.repartition(2)
df2 = df2.withColumn("id", F.monotonically_increasing_id())
df2.show()

+------+----------+
|number|        id|
+------+----------+
|     1|         0|
|     3|         1|
|     4|         2|
|     5|         3|
|     7|         4|
|     8|         5|
|    10|         6|
|    11|         7|
|     2|8589934592|
|     6|8589934593|
|     9|8589934594|
|    12|8589934595|
+------+----------+



In [0]:
df2 = df1.coalesce(2)
df2 = df2.withColumn("id", F.monotonically_increasing_id())
df2.show()

+------+----------+
|number|        id|
+------+----------+
|     1|         0|
|     2|         1|
|     3|         2|
|     4|         3|
|     5|         4|
|     6|         5|
|     7|8589934592|
|     8|8589934593|
|     9|8589934594|
|    10|8589934595|
|    11|8589934596|
|    12|8589934597|
+------+----------+



In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show()

zipped_rdd = df.rdd.zipWithIndex()
zipped_rdd.collect()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+

Out[134]: [(Row(name='John', role='Data scientist', salary=4500), 0),
 (Row(name='James', role='Data engineer', salary=3200), 1),
 (Row(name='Laura', role='Data scientist', salary=4100), 2),
 (Row(name='Ali', role='Data engineer', salary=3200), 3),
 (Row(name='Steve', role='Developer', salary=3600), 4)]

In [0]:
rdd = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0]))
df2 = spark.createDataFrame(rdd, schema= ['id']+ list(df.columns))
df2.show()

+---+-----+--------------+------+
| id| name|          role|salary|
+---+-----+--------------+------+
|  0| John|Data scientist|  4500|
|  1|James| Data engineer|  3200|
|  2|Laura|Data scientist|  4100|
|  3|  Ali| Data engineer|  3200|
|  4|Steve|     Developer|  3600|
+---+-----+--------------+------+



#### Sorting

In [0]:
df.sort("name").show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
|  Ali| Data engineer|  3200|
|James| Data engineer|  3200|
| John|Data scientist|  4500|
|Laura|Data scientist|  4100|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
df.sort(df["name"]).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
|  Ali| Data engineer|  3200|
|James| Data engineer|  3200|
| John|Data scientist|  4500|
|Laura|Data scientist|  4100|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
df.sort("salary", "name").show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
|  Ali| Data engineer|  3200|
|James| Data engineer|  3200|
|Steve|     Developer|  3600|
|Laura|Data scientist|  4100|
| John|Data scientist|  4500|
+-----+--------------+------+



In [0]:
df.sort(df["name"].desc()).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
|Steve|     Developer|  3600|
|Laura|Data scientist|  4100|
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|  Ali| Data engineer|  3200|
+-----+--------------+------+



In [0]:
df.sort(F.desc("name")).show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
|Steve|     Developer|  3600|
|Laura|Data scientist|  4100|
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|  Ali| Data engineer|  3200|
+-----+--------------+------+



#### Aggregation

In [0]:
df.groupBy("role").count().show()

+--------------+-----+
|          role|count|
+--------------+-----+
|Data scientist|    2|
| Data engineer|    2|
|     Developer|    1|
+--------------+-----+



In [0]:
df.groupBy("role", "salary").count().show()

+--------------+------+-----+
|          role|salary|count|
+--------------+------+-----+
|Data scientist|  4500|    1|
| Data engineer|  3200|    2|
|Data scientist|  4100|    1|
|     Developer|  3600|    1|
+--------------+------+-----+



In [0]:
df.groupBy("role").agg(F.count("salary"), F.avg("salary")).show()

+--------------+-------------+-----------+
|          role|count(salary)|avg(salary)|
+--------------+-------------+-----------+
|Data scientist|            2|     4300.0|
| Data engineer|            2|     3200.0|
|     Developer|            1|     3600.0|
+--------------+-------------+-----------+



In [0]:
df.agg(F.sum("salary")).show()

+-----------+
|sum(salary)|
+-----------+
|      18600|
+-----------+



In [0]:
df.select(F.sum("salary")).show()

+-----------+
|sum(salary)|
+-----------+
|      18600|
+-----------+



#### Pivoting

In [0]:
data = [("P1", 100, "Vancouver"), ("P2", 150, "Vancouver"), ("P3", 130, "Vancouver"), 
        ("P4", 190, "Vancouver"), ("P1", 50, "Toronto"), 
        ("P2", 60, "Toronto"), ("P3", 70, "Toronto"), ("P4", 60, "Toronto"), 
        ("P1", 30, "Calgary"), ("P2", 140 ,"Calgary")]
 
product_df = spark.createDataFrame(data).toDF("product_id", "quantity", "city")
product_df.show()

+----------+--------+---------+
|product_id|quantity|     city|
+----------+--------+---------+
|        P1|     100|Vancouver|
|        P2|     150|Vancouver|
|        P3|     130|Vancouver|
|        P4|     190|Vancouver|
|        P1|      50|  Toronto|
|        P2|      60|  Toronto|
|        P3|      70|  Toronto|
|        P4|      60|  Toronto|
|        P1|      30|  Calgary|
|        P2|     140|  Calgary|
+----------+--------+---------+



In [0]:
pivot_df = product_df.groupBy("product_id").pivot("city").sum("quantity")
pivot_df.show()

+----------+-------+-------+---------+
|product_id|Calgary|Toronto|Vancouver|
+----------+-------+-------+---------+
|        P4|   null|     60|      190|
|        P3|   null|     70|      130|
|        P1|     30|     50|      100|
|        P2|    140|     60|      150|
+----------+-------+-------+---------+



In [0]:
cities = ["Calgary", "Toronto", "Vancouver"]
pivot_df =product_df.groupBy("product_id").pivot("city", cities).sum("quantity")
pivot_df.show()

+----------+-------+-------+---------+
|product_id|Calgary|Toronto|Vancouver|
+----------+-------+-------+---------+
|        P4|   null|     60|      190|
|        P3|   null|     70|      130|
|        P1|     30|     50|      100|
|        P2|    140|     60|      150|
+----------+-------+-------+---------+



##### Unpivoting

In [0]:
expr = "stack(3, 'Vancouver', Vancouver, 'Toronto', Toronto, 'Calgary', Calgary) as (city, quantity)"
unpivot_df = pivot_df.select(F.col("product_id"), F.expr(expr)) \
                     .filter(F.col("quantity").isNotNull())
unpivot_df.show()

+----------+---------+--------+
|product_id|     city|quantity|
+----------+---------+--------+
|        P4|Vancouver|     190|
|        P4|  Toronto|      60|
|        P3|Vancouver|     130|
|        P3|  Toronto|      70|
|        P1|Vancouver|     100|
|        P1|  Toronto|      50|
|        P1|  Calgary|      30|
|        P2|Vancouver|     150|
|        P2|  Toronto|      60|
|        P2|  Calgary|     140|
+----------+---------+--------+



#### Window functions

In [0]:
data = [
  ("2020-01-01 07:30:00", "17.0"), 
  ("2020-01-02 07:30:00", "25.5"),  
  ("2020-01-03 07:30:00", "19.5"),  
  ("2020-01-04 07:30:00", "21.2"),  
  ("2020-01-05 07:30:00", "18.0"), 
  ("2020-01-06 07:30:00", "20.5")
  ]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("temperature", col("temperature").cast(DoubleType()))
df1.show()

+-------------------+-----------+
|               time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00|       17.0|
|2020-01-02 07:30:00|       25.5|
|2020-01-03 07:30:00|       19.5|
|2020-01-04 07:30:00|       21.2|
|2020-01-05 07:30:00|       18.0|
|2020-01-06 07:30:00|       20.5|
+-------------------+-----------+



In [0]:
w = Window.orderBy("time").rowsBetween(-2, Window.currentRow)  
df1.withColumn("rolling_average", F.round(F.avg("temperature").over(w), 2)).show()

+-------------------+-----------+---------------+
|               time|temperature|rolling_average|
+-------------------+-----------+---------------+
|2020-01-01 07:30:00|       17.0|           17.0|
|2020-01-02 07:30:00|       25.5|          21.25|
|2020-01-03 07:30:00|       19.5|          20.67|
|2020-01-04 07:30:00|       21.2|          22.07|
|2020-01-05 07:30:00|       18.0|          19.57|
|2020-01-06 07:30:00|       20.5|           19.9|
+-------------------+-----------+---------------+



In [0]:
w = Window.orderBy(df1["time"].desc()).rowsBetween(-2, Window.currentRow)  
df1.withColumn("rolling_average", F.round(F.avg("temperature").over(w), 2)).show()

+-------------------+-----------+---------------+
|               time|temperature|rolling_average|
+-------------------+-----------+---------------+
|2020-01-06 07:30:00|       20.5|           20.5|
|2020-01-05 07:30:00|       18.0|          19.25|
|2020-01-04 07:30:00|       21.2|           19.9|
|2020-01-03 07:30:00|       19.5|          19.57|
|2020-01-02 07:30:00|       25.5|          22.07|
|2020-01-01 07:30:00|       17.0|          20.67|
+-------------------+-----------+---------------+



In [0]:
w = Window.orderBy("time")
df1.withColumn("cum_sum", F.sum("temperature").over(w)).show()

+-------------------+-----------+-------+
|               time|temperature|cum_sum|
+-------------------+-----------+-------+
|2020-01-01 07:30:00|       17.0|   17.0|
|2020-01-02 07:30:00|       25.5|   42.5|
|2020-01-03 07:30:00|       19.5|   62.0|
|2020-01-04 07:30:00|       21.2|   83.2|
|2020-01-05 07:30:00|       18.0|  101.2|
|2020-01-06 07:30:00|       20.5|  121.7|
+-------------------+-----------+-------+



In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")

window_spec  = Window.partitionBy("role").orderBy("salary")
df.withColumn("row_number", F.row_number().over(window_spec)).show()

+-----+--------------+------+----------+
| name|          role|salary|row_number|
+-----+--------------+------+----------+
|James| Data engineer|  3200|         1|
|  Ali| Data engineer|  3200|         2|
|Laura|Data scientist|  4100|         1|
| John|Data scientist|  4500|         2|
|Steve|     Developer|  3600|         1|
+-----+--------------+------+----------+



In [0]:
data = [(1, 1),(2, 2), (2, 3), (2, 4), (4, 5), (6, 6), (7, 7)]
df1 = spark.createDataFrame(data).toDF("id", "num")
w = Window.orderBy("id").rowsBetween(Window.currentRow, 1)  
df1.withColumn("rolling_sum", F.sum("num").over(w)).show()

+---+---+-----------+
| id|num|rolling_sum|
+---+---+-----------+
|  1|  1|          3|
|  2|  2|          5|
|  2|  3|          7|
|  2|  4|          9|
|  4|  5|         11|
|  6|  6|         13|
|  7|  7|          7|
+---+---+-----------+



In [0]:
w1 = Window.orderBy("id").rangeBetween(Window.currentRow, 1)  
df1.withColumn("rolling_sum", F.sum("num").over(w1)).show()

+---+---+-----------+
| id|num|rolling_sum|
+---+---+-----------+
|  1|  1|         10|
|  2|  2|          9|
|  2|  3|          9|
|  2|  4|          9|
|  4|  5|          5|
|  6|  6|         13|
|  7|  7|          7|
+---+---+-----------+



In [0]:
w2 = Window.orderBy(F.col("id") * 2).rangeBetween(Window.currentRow, 1)  
df1.withColumn("rolling_sum", F.sum("num").over(w1)).withColumn("id*2", df1["id"]*2).show()

+---+---+-----------+----+
| id|num|rolling_sum|id*2|
+---+---+-----------+----+
|  1|  1|         10|   2|
|  2|  2|          9|   4|
|  2|  3|          9|   4|
|  2|  4|          9|   4|
|  4|  5|          5|   8|
|  6|  6|         13|  12|
|  7|  7|          7|  14|
+---+---+-----------+----+



##### Shifting  a column

In [0]:
w = Window.orderBy("time")  
df1.withColumn("lag_col", F.lag("temperature", 1).over(w)).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1442977062306290>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0mw[0m [0;34m=[0m [0mWindow[0m[0;34m.[0m[0morderBy[0m[0;34m([0m[0;34m"time"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf1[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"lag_col"[0m[0;34m,[0m [0mF[0m[0;34m.[0m[0mlag[0m[0;34m([0m[0;34m"temperature"[0m[0;34m,[0m [0;36m1[0m[0;34m)[0m[0;34m.[0m[0mover[0m[0;34m([0m[0mw[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)

In [0]:
df1.withColumn("lead_col", F.lag("temperature", -1, 0).over(w)).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1442977062306291>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf1[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"lead_col"[0m[0;34m,[0m [0mF[0m[0;34m.[0m[0mlag[0m[0;34m([0m[0;34m"temperature"[0m[0;34m,[0m [0;34m-[0m[0;36m1[0m[0;34m,[0m [0;36m0[0m[0;34m)[0m[0;34m.[0m[0mover[0m[0;34m([0m[0mw[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0

In [0]:
w = Window.orderBy("time")
df1.withColumn("diff", F.col("temperature") - F.lag("temperature", 1).over(w)).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1442977062306292>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0mw[0m [0;34m=[0m [0mWindow[0m[0;34m.[0m[0morderBy[0m[0;34m([0m[0;34m"time"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf1[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"diff"[0m[0;34m,[0m [0mF[0m[0;34m.[0m[0mcol[0m[0;34m([0m[0;34m"temperature"[0m[0;34m)[0m [0;34m-[0m [0mF[0m[0;34m.[0m[0mlag[0m[0;34m([0m[0;34m"temperature"[0m[0;34m,[0m [0;36m1[0m[0;34m)[0m[0;34m.[0m[0mover[0m[0;34m([0m[0mw[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             

##### Filling a column

In [0]:
data = [
  ("2020-01-01 07:30:00", None), 
  ("2020-01-02 07:30:00", 25.5),  
  ("2020-01-03 07:30:00", None),  
  ("2020-01-04 07:30:00", 21.2),  
  ("2020-01-05 07:30:00", 18.0), 
  ("2020-01-06 07:30:00", None)
  ]

df1 = spark.createDataFrame(data).toDF("time", "temperature")
df1 = df1.withColumn("time", F.col("time").cast('timestamp'))
df1.show()

+-------------------+-----------+
|               time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00|       null|
|2020-01-02 07:30:00|       25.5|
|2020-01-03 07:30:00|       null|
|2020-01-04 07:30:00|       21.2|
|2020-01-05 07:30:00|       18.0|
|2020-01-06 07:30:00|       null|
+-------------------+-----------+



In [0]:
# ffill
w = Window.orderBy('time')
df2 = df1.select([F.last(c, ignorenulls=True).over(w).alias(c) for c in df1.columns])
df2.show()

+-------------------+-----------+
|               time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00|       null|
|2020-01-02 07:30:00|       25.5|
|2020-01-03 07:30:00|       25.5|
|2020-01-04 07:30:00|       21.2|
|2020-01-05 07:30:00|       18.0|
|2020-01-06 07:30:00|       18.0|
+-------------------+-----------+



In [0]:
# bfill
w = Window.orderBy('time').rowsBetween(Window.currentRow, Window.unboundedFollowing)
df2 = df1.select([F.first(c,ignorenulls=True).over(w).alias(c) for c in df1.columns])
df2.show()

+-------------------+-----------+
|               time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00|       25.5|
|2020-01-02 07:30:00|       25.5|
|2020-01-03 07:30:00|       21.2|
|2020-01-04 07:30:00|       21.2|
|2020-01-05 07:30:00|       18.0|
|2020-01-06 07:30:00|       null|
+-------------------+-----------+



In [0]:
# ffill.cfill
w1 = Window.orderBy('time')
w2 = w1.rowsBetween(Window.currentRow, Window.unboundedFollowing)
df2 = df1.select([F.coalesce(F.last(c, ignorenulls=True).over(w1), F.first(c, ignorenulls=True).over(w2)).alias(c) for c in df1.columns])
df2.show()

+-------------------+-----------+
|               time|temperature|
+-------------------+-----------+
|2020-01-01 07:30:00|       25.5|
|2020-01-02 07:30:00|       25.5|
|2020-01-03 07:30:00|       25.5|
|2020-01-04 07:30:00|       21.2|
|2020-01-05 07:30:00|       18.0|
|2020-01-06 07:30:00|       18.0|
+-------------------+-----------+



#### Joins

In [0]:
data =[ 
  ("John", "Data scientist", 4500),
  ("James", "Data engineer", 3200),
  ("Laura", "Data scientist", 4100),
  ("Ali", "Data engineer", 3200),
  ("Steve", "Developer", 3600)
]
df = spark.createDataFrame(data).toDF("name","role", "salary")
df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|
|Steve|     Developer|  3600|
+-----+--------------+------+



In [0]:
data = [
  ("John", 45),
  ("James", 25),
  ("Laura", 30),
  ("Will", 28)
]
age_df = spark.createDataFrame(data).toDF("name", "age")
age_df.show()

+-----+---+
| name|age|
+-----+---+
| John| 45|
|James| 25|
|Laura| 30|
| Will| 28|
+-----+---+



##### Inner join

In [0]:
joined_df = df.join(age_df, df["name"] == age_df["name"], "inner")
joined_df.show()

+-----+--------------+------+-----+---+
| name|          role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist|  4500| John| 45|
|James| Data engineer|  3200|James| 25|
|Laura|Data scientist|  4100|Laura| 30|
+-----+--------------+------+-----+---+



In [0]:
joined_df = df.join(age_df, ["name"], "inner")
joined_df.show()

+-----+--------------+------+---+
| name|          role|salary|age|
+-----+--------------+------+---+
| John|Data scientist|  4500| 45|
|James| Data engineer|  3200| 25|
|Laura|Data scientist|  4100| 30|
+-----+--------------+------+---+



In [0]:
df.join(age_df, "name").show()

+-----+--------------+------+---+
| name|          role|salary|age|
+-----+--------------+------+---+
| John|Data scientist|  4500| 45|
|James| Data engineer|  3200| 25|
|Laura|Data scientist|  4100| 30|
+-----+--------------+------+---+



In [0]:
joined_df = df.alias("df1").join(age_df.alias("df2"), col("df1.name") == col("df2.name"), "inner")
joined_df.show()

+-----+--------------+------+-----+---+
| name|          role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist|  4500| John| 45|
|James| Data engineer|  3200|James| 25|
|Laura|Data scientist|  4100|Laura| 30|
+-----+--------------+------+-----+---+



In [0]:
joined_df = df.alias("df1").join(age_df.alias("df2"), F.expr("df1.name == df2.name"), "inner")
joined_df.show()

+-----+--------------+------+-----+---+
| name|          role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist|  4500| John| 45|
|James| Data engineer|  3200|James| 25|
|Laura|Data scientist|  4100|Laura| 30|
+-----+--------------+------+-----+---+



In [0]:
joined_df = df.alias("df1").join(df.alias("df2"), F.col("df1.salary") < F.col("df2.salary"), "inner")
joined_df.show()

+-----+--------------+------+-----+--------------+------+
| name|          role|salary| name|          role|salary|
+-----+--------------+------+-----+--------------+------+
|James| Data engineer|  3200| John|Data scientist|  4500|
|James| Data engineer|  3200|Laura|Data scientist|  4100|
|James| Data engineer|  3200|Steve|     Developer|  3600|
|Laura|Data scientist|  4100| John|Data scientist|  4500|
|  Ali| Data engineer|  3200| John|Data scientist|  4500|
|  Ali| Data engineer|  3200|Laura|Data scientist|  4100|
|  Ali| Data engineer|  3200|Steve|     Developer|  3600|
|Steve|     Developer|  3600| John|Data scientist|  4500|
|Steve|     Developer|  3600|Laura|Data scientist|  4100|
+-----+--------------+------+-----+--------------+------+



In [0]:
age_df1 = age_df.withColumnRenamed("name", "employee_name") 
joined_df = df.join(age_df1, F.expr("name == employee_name"), "inner")
joined_df.show()

+-----+--------------+------+-------------+---+
| name|          role|salary|employee_name|age|
+-----+--------------+------+-------------+---+
| John|Data scientist|  4500|         John| 45|
|James| Data engineer|  3200|        James| 25|
|Laura|Data scientist|  4100|        Laura| 30|
+-----+--------------+------+-------------+---+



In [0]:
joined_df = df.join(age_df).where(df["name"] == age_df["name"])
joined_df.show()

+-----+--------------+------+-----+---+
| name|          role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist|  4500| John| 45|
|James| Data engineer|  3200|James| 25|
|Laura|Data scientist|  4100|Laura| 30|
+-----+--------------+------+-----+---+



##### Left and right joins

In [0]:
joined_df = df.join(age_df, df["name"] == age_df["name"], "left")
joined_df.show()

+-----+--------------+------+-----+----+
| name|          role|salary| name| age|
+-----+--------------+------+-----+----+
| John|Data scientist|  4500| John|  45|
|James| Data engineer|  3200|James|  25|
|Laura|Data scientist|  4100|Laura|  30|
|  Ali| Data engineer|  3200| null|null|
|Steve|     Developer|  3600| null|null|
+-----+--------------+------+-----+----+



In [0]:
joined_df = df.join(age_df, "name", "left")
joined_df.show()

+-----+--------------+------+----+
| name|          role|salary| age|
+-----+--------------+------+----+
| John|Data scientist|  4500|  45|
|James| Data engineer|  3200|  25|
|Laura|Data scientist|  4100|  30|
|  Ali| Data engineer|  3200|null|
|Steve|     Developer|  3600|null|
+-----+--------------+------+----+



In [0]:
joined_df = df.join(age_df, ["name"], "right")
joined_df.show()

+-----+--------------+------+---+
| name|          role|salary|age|
+-----+--------------+------+---+
| John|Data scientist|  4500| 45|
|James| Data engineer|  3200| 25|
|Laura|Data scientist|  4100| 30|
| Will|          null|  null| 28|
+-----+--------------+------+---+



##### Full join

In [0]:
df.join(age_df, "name", "full").show()

+-----+--------------+------+----+
| name|          role|salary| age|
+-----+--------------+------+----+
|  Ali| Data engineer|  3200|null|
|James| Data engineer|  3200|  25|
| John|Data scientist|  4500|  45|
|Laura|Data scientist|  4100|  30|
|Steve|     Developer|  3600|null|
| Will|          null|  null|  28|
+-----+--------------+------+----+



##### Left semi join

In [0]:
joined_df = df.join(age_df, df["name"] == age_df["name"], "leftsemi")
joined_df.show()

+-----+--------------+------+
| name|          role|salary|
+-----+--------------+------+
| John|Data scientist|  4500|
|James| Data engineer|  3200|
|Laura|Data scientist|  4100|
+-----+--------------+------+



##### Left anti join

In [0]:
joined_df = df.join(age_df, df["name"] == age_df["name"], "leftanti")
joined_df.show()

+-----+-------------+------+
| name|         role|salary|
+-----+-------------+------+
|  Ali|Data engineer|  3200|
|Steve|    Developer|  3600|
+-----+-------------+------+



##### Cross join

In [0]:
joined_df = df.join(age_df, df["name"] == age_df["name"], "cross")
joined_df.show()

+-----+--------------+------+-----+---+
| name|          role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist|  4500| John| 45|
|James| Data engineer|  3200|James| 25|
|Laura|Data scientist|  4100|Laura| 30|
+-----+--------------+------+-----+---+



In [0]:
joined_df = df.crossJoin(age_df)
joined_df.show()

+-----+--------------+------+-----+---+
| name|          role|salary| name|age|
+-----+--------------+------+-----+---+
| John|Data scientist|  4500| John| 45|
| John|Data scientist|  4500|James| 25|
| John|Data scientist|  4500|Laura| 30|
| John|Data scientist|  4500| Will| 28|
|James| Data engineer|  3200| John| 45|
|James| Data engineer|  3200|James| 25|
|James| Data engineer|  3200|Laura| 30|
|James| Data engineer|  3200| Will| 28|
|Laura|Data scientist|  4100| John| 45|
|Laura|Data scientist|  4100|James| 25|
|Laura|Data scientist|  4100|Laura| 30|
|Laura|Data scientist|  4100| Will| 28|
|  Ali| Data engineer|  3200| John| 45|
|  Ali| Data engineer|  3200|James| 25|
|  Ali| Data engineer|  3200|Laura| 30|
|  Ali| Data engineer|  3200| Will| 28|
|Steve|     Developer|  3600| John| 45|
|Steve|     Developer|  3600|James| 25|
|Steve|     Developer|  3600|Laura| 30|
|Steve|     Developer|  3600| Will| 28|
+-----+--------------+------+-----+---+



#### Concatenating DataFrames

In [0]:
df1 = spark.createDataFrame([(1, 2, "a")]).toDF("col0", "col1", "col2")
df2 = spark.createDataFrame([(3, 4, "b")]).toDF("col0", "col1", "col2")
df1.union(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   a|
|   3|   4|   b|
+----+----+----+



In [0]:
df1 = spark.createDataFrame([(1, 2, "a")]).toDF("col0", "col1", "col2")
df2 = spark.createDataFrame([("b", 3, 4)]).toDF("col2", "col0", "col1")
df1.union(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   a|
|   b|   3|   4|
+----+----+----+



In [0]:
df1 = spark.createDataFrame([(1, 2, "a")]).toDF("col0", "col1", "col2")
df2 = spark.createDataFrame([("b", 3, 4)]).toDF("col2", "col0", "col1")
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   a|
|   3|   4|   b|
+----+----+----+



In [0]:
df1 = spark.createDataFrame([(1, 2), (3, 4)]).toDF("col0", "col1")
df2 = spark.createDataFrame([("a", "b"), ("c", "d")]).toDF("col3", "col4")

df1.withColumn("row_id", F.monotonically_increasing_id()) \
   .join(df2.withColumn("row_id", F.monotonically_increasing_id()), "row_id") \
   .drop("row_id").show()

+----+----+----+----+
|col0|col1|col3|col4|
+----+----+----+----+
|   1|   2|   a|   b|
|   3|   4|   c|   d|
+----+----+----+----+



#### Resampling

In [0]:
data = [
  ("2020-01-02 01:00:00", 1), 
  ("2020-01-02 01:20:00", 2),  
  ("2020-01-02 01:40:00", 4),  
  ("2020-01-02 02:00:00", 5),  
  ("2020-01-02 02:20:00", 6), 
  ("2020-01-02 02:40:00", 7),
  ("2020-01-02 03:00:00", 8), 
  ("2020-01-02 03:20:00", 9),  
  ("2020-01-02 03:40:00", 10),  
  ("2020-01-02 04:00:00", 11),  
  ("2020-01-02 04:20:00", 12), 
  ("2020-01-02 04:40:00", 13),
  ("2020-01-02 05:00:00", 14),
  ]

df1 = spark.createDataFrame(data).toDF("time", "value")
df1 = df1.withColumn("time", F.col("time").cast("timestamp"))
df1.show()

+-------------------+-----+
|               time|value|
+-------------------+-----+
|2020-01-02 01:00:00|    1|
|2020-01-02 01:20:00|    2|
|2020-01-02 01:40:00|    4|
|2020-01-02 02:00:00|    5|
|2020-01-02 02:20:00|    6|
|2020-01-02 02:40:00|    7|
|2020-01-02 03:00:00|    8|
|2020-01-02 03:20:00|    9|
|2020-01-02 03:40:00|   10|
|2020-01-02 04:00:00|   11|
|2020-01-02 04:20:00|   12|
|2020-01-02 04:40:00|   13|
|2020-01-02 05:00:00|   14|
+-------------------+-----+



In [0]:
# df1.resample('1H', closed='left', label = 'left').sum()
df1.groupBy(F.date_trunc('hour', df1['time']).alias('time')) \
   .sum().orderBy('time').show()

+-------------------+----------+
|               time|sum(value)|
+-------------------+----------+
|2020-01-02 01:00:00|         7|
|2020-01-02 02:00:00|        18|
|2020-01-02 03:00:00|        27|
|2020-01-02 04:00:00|        36|
|2020-01-02 05:00:00|        14|
+-------------------+----------+



In [0]:
# df1.resample('1H', closed='left', label = 'right').sum()
df1.groupBy((F.date_trunc('hour', df1['time'])+F.expr('INTERVAL 60 MINUTES')) \
   .alias('time')).sum().orderBy('time').show()

+-------------------+----------+
|               time|sum(value)|
+-------------------+----------+
|2020-01-02 02:00:00|         7|
|2020-01-02 03:00:00|        18|
|2020-01-02 04:00:00|        27|
|2020-01-02 05:00:00|        36|
|2020-01-02 06:00:00|        14|
+-------------------+----------+



In [0]:
# df1.resample('1H', closed='right', label = 'right').sum()
diff = col('time').cast('long') - F.date_trunc('hour', col('time')).cast('long')
ind = F.when(diff == 0, col('time')) \
       .otherwise(F.date_trunc('hour', col('time')+F.expr('INTERVAL 60 MINUTES')))
df1.groupBy(ind.alias('time')) \
   .sum().orderBy('time').show()

+-------------------+----------+
|               time|sum(value)|
+-------------------+----------+
|2020-01-02 01:00:00|         1|
|2020-01-02 02:00:00|        11|
|2020-01-02 03:00:00|        21|
|2020-01-02 04:00:00|        30|
|2020-01-02 05:00:00|        39|
+-------------------+----------+



In [0]:
# df1.resample('1H', closed='right', label = 'left').sum()
diff = col('time').cast('long') - F.date_trunc('hour', col('time')).cast('long')
ind = F.when(diff == 0, col('time')) \
       .otherwise(F.date_trunc('hour', col('time')+F.expr('INTERVAL 60 MINUTES')))
df1.groupBy((ind-F.expr('INTERVAL 60 MINUTES')).alias('time')) \
           .sum().orderBy('time').show()

+-------------------+----------+
|               time|sum(value)|
+-------------------+----------+
|2020-01-02 00:00:00|         1|
|2020-01-02 01:00:00|        11|
|2020-01-02 02:00:00|        21|
|2020-01-02 03:00:00|        30|
|2020-01-02 04:00:00|        39|
+-------------------+----------+



In [0]:
data = [
  ("2020-01-01 01:00:00", 1), 
  ("2020-01-01 05:00:00", 2),  
  ("2020-01-01 10:00:00", 3),  
  ]

df1 = spark.createDataFrame(data).toDF("time", "value")
df1 = df1.withColumn("time", F.col("time").cast("timestamp"))
df1.show()

+-------------------+-----+
|               time|value|
+-------------------+-----+
|2020-01-01 01:00:00|    1|
|2020-01-01 05:00:00|    2|
|2020-01-01 10:00:00|    3|
+-------------------+-----+



In [0]:
hour = 60 * 60 
epoch = (col("time").cast("long") / hour).cast("long") * hour
with_epoch = df1.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(F.min("epoch"), F.max("epoch")).first()

time_range = spark.range(min_epoch, max_epoch + 1, hour).toDF("epoch")

time_range.join(with_epoch.select(['epoch', 'value']), "epoch", "left") \
          .orderBy("epoch").withColumn("time", col("epoch") \
          .cast("timestamp")).drop('epoch') \
          .select(['time', 'value']).show()

+-------------------+-----+
|               time|value|
+-------------------+-----+
|2020-01-01 01:00:00|    1|
|2020-01-01 02:00:00| null|
|2020-01-01 03:00:00| null|
|2020-01-01 04:00:00| null|
|2020-01-01 05:00:00|    2|
|2020-01-01 06:00:00| null|
|2020-01-01 07:00:00| null|
|2020-01-01 08:00:00| null|
|2020-01-01 09:00:00| null|
|2020-01-01 10:00:00|    3|
+-------------------+-----+

