# Using Python Dataframes on Spark API for Data engineering tasks

This notebook will explore basic and intermediate tasks and operators, that engineer should be comfortable to use. This tasks can be written similar in Scala (Spark).

## Create Dataframe

In [0]:
# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)
employee5 = Employee('michael', 'jackson', 'no-reply@neverla.nd', 80000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee5, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)

## Create dataframes from list of rows

In [0]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

display(df1)

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)

department,employees
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"


department,employees
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


## Union of two dataframes

In [0]:
unionDF = df1.union(df2)
display(unionDF)

department,employees
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


## Write the unioned DataFrame to a Parquet file

In [0]:
# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)
unionDF.write.parquet("/tmp/databricks-df-example.parquet")

## Read from  a Parquet file

In [0]:
parquetDF = spark.read.parquet("/tmp/databricks-df-example.parquet")
display(parquetDF)

department,employees
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


## Explode the employee columns

In [0]:
from pyspark.sql.functions import explode

explodeDF = unionDF.select(explode("employees").alias("e"))
flattenDF = explodeDF.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

flattenDF.show()

## Filtering data (rows) to match the predicate

In [0]:
filterDF = flattenDF.filter(flattenDF.firstName == "xiangrui").sort(flattenDF.lastName)
display(filterDF)
## or

firstName,lastName,email,salary
xiangrui,meng,no-reply@stanford.edu,120000
xiangrui,meng,no-reply@stanford.edu,120000


In [0]:
from pyspark.sql.functions import col, asc

# Use `|` instead of `or`
filterDF = flattenDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(filterDF)
## or

firstName,lastName,email,salary
michael,armbrust,no-reply@berkeley.edu,100000
michael,jackson,no-reply@neverla.nd,80000
xiangrui,meng,no-reply@stanford.edu,120000
xiangrui,meng,no-reply@stanford.edu,120000


In [0]:
whereDF = flattenDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(whereDF)

firstName,lastName,email,salary
michael,armbrust,no-reply@berkeley.edu,100000
michael,jackson,no-reply@neverla.nd,80000
xiangrui,meng,no-reply@stanford.edu,120000
xiangrui,meng,no-reply@stanford.edu,120000


## Replacing values

In [0]:
nonNullDF = flattenDF.fillna("--")
display(nonNullDF)

firstName,lastName,email,salary
michael,armbrust,no-reply@berkeley.edu,100000
xiangrui,meng,no-reply@stanford.edu,120000
matei,--,no-reply@waterloo.edu,140000
--,wendell,no-reply@berkeley.edu,160000
michael,jackson,no-reply@neverla.nd,80000
--,wendell,no-reply@berkeley.edu,160000
xiangrui,meng,no-reply@stanford.edu,120000
matei,--,no-reply@waterloo.edu,140000


## Aggregating data (sum, count, groupby, summary, min, max, ...)

In [0]:
from pyspark.sql.functions import countDistinct

countDistinctDF = nonNullDF.select("firstName", "lastName")\
  .groupBy("firstName")\
  .agg(countDistinct("lastName").alias("distinct_last_names"))

display(countDistinctDF)

firstName,distinct_last_names
xiangrui,1
matei,1
michael,2
--,1


In [0]:
salarySumDF = nonNullDF.agg({"salary" : "sum"})
display(salarySumDF)

sum(salary)
1020000


In [0]:
nonNullDF.describe("salary").show()

## Clean up Parquet file

In [0]:
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)

## Working with functions

### Create sample dataset

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Build an example DataFrame dataset to work with.
dbutils.fs.rm("/tmp/dataframe_sample.csv", True)
dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
""", True)

df = spark.read.format("csv").options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv")
df.printSchema()

### Using built-in functions

In [0]:
# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))

In [0]:
df.createOrReplaceTempView("sample_df")
display(sql("select * from sample_df"))

id,end_date,start_date,location,date,date_only,city,date_diff
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA-SF,2015-10-14,2015-10-14,SF,30
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA-SD,2015-10-15,2015-10-15,SD,62
3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY-NY,2015-10-16,2015-10-16,NY,275
4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY-NY,2015-10-17,2015-10-17,NY,245
5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA-SD,2015-10-18,2015-10-18,SD,552


### Convert to JSON format

In [0]:
rdd_json = df.toJSON()
rdd_json.take(2)

### Create user-defined function (UDF)

In [0]:
from pyspark.sql import functions as F

add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))

### ... and pass the parameter to UDF

In [0]:
# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
display(df_filtered)

id,end_date,start_date,location,date,date_only,city,date_diff,id_offset
1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA-SF,2015-10-14,2015-10-14,SF,30,1001
2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA-SD,2015-10-15,2015-10-15,SD,62,1002


In [0]:
#md
### Aggregate over multiple columns

In [0]:
agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
display(agg_df)

location,min(id),count(id),avg(date_diff)
CA-SD,2,2,307.0
CA-SF,1,1,30.0
NY-NY,3,2,260.0


### And store data to Parquet file on file partitiion by time (time - end)

In [0]:
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
display(dbutils.fs.ls("/tmp/sample_table"))

path,name,size
dbfs:/tmp/sample_table/_SUCCESS,_SUCCESS,0
dbfs:/tmp/sample_table/end_year=2015/,end_year=2015/,0
