# Initialize Environment

In [None]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.appName("Dataframe operations").master("local[3]").getOrCreate()

# 1. Create Dataframe

In [None]:
#-------------------------------------------------------------------------

# create dummy data
columns = ["languages", "users_count"]
data = [("Java", "20000"), ("Python", "10000"), ("Scala", "5000")]

#-------------------------------------------------------------------------

rdd = spark.sparkContext.parallelize(data)

# 1.1 Using toDF() function

# Data Types are automatically infered from the data type of values, 
# otherwise we can supply schema to give data types manually.
df = rdd.toDF()
df = df.printSchema()

df = rdd.toDF(columns)
df.printSchema()

#-------------------------------------------------------------------------

# 1.2 Using createDataFrame() function

df = spark.createDataFrame(data).toDF(*columns)
df.printSchema()

# using Row type
from pyspark.sql.types import Row
rowData = map(lambda x: Row(*x), data)
df = spark.createDataFrame(rowData, columns)
df.show()

# Define schema using StructType and StructField
data = [
    ("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ 
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("id", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("salary", IntegerType(), True) 
  ])
 
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show(truncate=False)

#-------------------------------------------------------------------------

# 2. StructType and StructField

In [None]:
#-------------------------------------------------------------------------

# 2.1  Define nested StructType object struct

data = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]

schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 2.2 Adding and changing the struct of the dataframe using struct() function

df1 = df.withColumn("OtherInfo",struct(
                        col("id").alias("identifier"),
                        col("gender").alias("gender"),
                        col("salary").alias("salary"),
                        when(col("salary").cast(IntegerType()) < 2000, "Low")
                        .when(col("salary").cast(IntegerType()) < 4000, "Medium")
                        .otherwise("High").alias("salary_grade")
                            
                    )).drop("id", "gender", "salary")

df1.printSchema()
df1.show(truncate=False)

#-------------------------------------------------------------------------

# 2.3 Using SQL ArrayType and MapType

array_data = [
    (("James","","Smith"),["Cricket", "Football", "Chess"], {"eyecolor" : "blue", "tall" : "true"}),
    (("Michael","Rose",""),["Cricket", "Football", "Chess"], {"eyecolor" : "blue", "tall" : "true"}),
    (("Robert","","Williams"),["Cricket", "Football", "Chess"], {"eyecolor" : "blue", "tall" : "true"}),
    (("Maria","Anne","Jones"),["Cricket", "Football", "Chess"], {"eyecolor" : "blue", "tall" : "true"}),
    (("Jen","Mary","Brown"),["Cricket", "Football", "Chess"], {"eyecolor" : "blue", "tall" : "true"})
  ]


array_structure_schema = StructType([
    StructField("name", StructType([
        StructField("firstname", StringType(), True),
        StructField("middlename", StringType(), True),
        StructField("lastname", StringType(), True)
    ])),
    StructField("hobbies", ArrayType(StringType()), True),
    StructField("properties", MapType(StringType(), StringType(), True))
])

df2 = spark.createDataFrame(array_data, array_structure_schema)
df2.printSchema()
df2.show(truncate=False)
print(df2.schema.simpleString())

#-------------------------------------------------------------------------

# 2.4 Creating StructType object from a JSON file

"""
import json

schema_from_json = StructType.fromJson(json.loads(schema.json))
df = spark.creatDataFrame(data, schema_from_json)
"""

#-------------------------------------------------------------------------

# 2.5 Creating schema using DDL string

ddl_schema_str = """
                `fullname` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,
                `age` INT,
                `gender` STRING
             """

#-------------------------------------------------------------------------

# 2.6 Checking if column exists in a dataframe

# print(df.schema.fieldNames.contains("firstname"))
# print(df.schema.contains(StructField("firstname", StringType, True)))

#-------------------------------------------------------------------------

# 3. Row Class

In [None]:
#-------------------------------------------------------------------------

from pyspark.sql.types import Row

#-------------------------------------------------------------------------

# Key points for Row class

# Earlier to Spark 3.0, when used Row class with named arguments, the fields are sorted by name.
# Since 3.0, Rows created from named arguments are not sorted alphabetically instead they will be ordered 
# in the position entered.
# To enable sorting by names, set the environment variable PYSPARK_ROW_FIELD_SORTING_ENABLED to true.
# Row class provides a way to create a struct-type column as well.

#-------------------------------------------------------------------------

# 3.1 Create a Row Object

# Row class extends tuple class

row = Row("James", 40)
print(f"{row[0]} , {row[1]}")

# Row class accepts named arguments
row = Row(name="Alice", age=11)
print(f"{row.name}")

#-------------------------------------------------------------------------

# 3.2 Create custom class from Row

Person = Row("name", "age")
p1 = Person("James", 40)
p2 = Person("Bob", 50)
print(f"{p1.name}")

#-------------------------------------------------------------------------

# 3.3 Using Row class on RDD

# When we use Row to create an RDD, after collecting the data we will get the result back in Row.

data = [
    Row(name="James,,Smith", lang=["Java", "Scala", "C++"], state="CA"),
    Row(name="Michael,,Rose", lang=["Java", "Scala", "GoLang"], state="NJ"),
    Row(name="Robert,,Williams", lang=["Java", "CSharp"], state="NV")
]

rdd = spark.sparkContext.parallelize(data)

print(rdd.collect())

collected_data = rdd.collect()
for row in collected_data:
    print(f"{row.name} , {row.lang}")
    
#-------------------------------------------------------------------------

# 3.4 Using Row class on DataFrame

df = spark.createDataFrame(data) # column names are inferred from named arguments
df.printSchema() 
df.show()

df = spark.createDataFrame(data).toDF(*["languages_at_school", "name", "current_state"])
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 3.5 Create Nested Struct using Row class

data = [
    Row(name="James", prop=Row(hair="black", eye="blue")),
    Row(name="Ann", prop=Row(hair="grey", eye="black"))
]

df = spark.createDataFrame(data)
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 4. Column Class

In [None]:
#-------------------------------------------------------------------------

from pyspark.sql.types import Row

#-------------------------------------------------------------------------

# 4.1 Create Column class object

col_obj = lit("Hello World")
print(col_obj)

data = [
    ("James", 23),
    ("Bob", 40)
]

df = spark.createDataFrame(data).toDF("name.fname", "age")
df.printSchema()

# access column from dataframe

# using df object

df.select(df.age).show()
df.select(df["age"]).show()
df.select(df["`name.fname`"]).show()

# using col function

df.select(col("age")).show()
df.select(col("`name.fname`")).show()

# access struct type column values

data = [
    Row(name="James", props=Row(eye="blue", hair="black")),
    Row(name="Bob", props=Row(eye="brown", hair="white"))
]

df = spark.createDataFrame(data)
df.printSchema()

df.select(df.props.hair).show()
df.select(df["props.hair"]).show()
df.select(col("props.hair")).show()
df.select(col("props.*")).show()

#-------------------------------------------------------------------------

# 4.2 Column Operators

data = [
    (100, 2, 1),
    (200, 3, 4),
    (300, 4, 4)
]

df = spark.createDataFrame(data).toDF("col1", "col2", "col3")

# arithmetic operations
df.select(df.col1 + df.col2, 
          df.col1 - df.col2, 
          df.col1 * df.col2, 
          df.col1 / df.col2, 
          df.col1 % df.col2).show()

# logical comparison

df.select((df.col2 > df.col3).alias("greater than comparison"), df.col2 < df.col3, df.col2 == df.col3).show()

#-------------------------------------------------------------------------

# 4.3 Column functions

data = [
    ("James", "Bond", "100", None),
    ("Bob", "Dylan", "200", "F"),
    ("Tom Cruise","XXX", "400", ""),
    ("Tom Brand", None, "400", 'M')
]

df = spark.createDataFrame(data).toDF("fname", "lname", "id", "gender")
df.show()

# alias()

df.select(df.fname.alias("first_name"), col("lname").alias("last_name"), expr("fname || ',' || lname").alias("fullname")).show()

# sort(), asc() and desc() function

df.sort(col("fname").asc()).show()
df.sort(df.fname.desc()).show()

# cast() and astype() 

df.select(df.fname, df.id.cast("int")).printSchema()

# between()

df.filter(df.id.between(100, 300)).show()

# contains()

df.filter(df.fname.contains("Cruise")).show()

# startswith() and endswith()

df.filter(df.fname.startswith("T")).show()
df.filter(df.lname.endswith("Dylan")).show()

# isNUll() and isNotNull()

df.filter(df.gender.isNull()).show()
df.filter(df.lname.isNotNull()).show()

# like() and rlike() 

df.select(col("fname"), col("id")).filter(col("fname").like("%om%")).show()

# when() and otherwise()

df.select(col("fname"), 
          when(df.gender == 'M', "Male")
          .when(df.gender == 'F', "Female")
          .when(df.gender == None, '')
          .otherwise(df.gender).alias("new_gender")
         ).show()

# isin()

li = ["100",  "200"]
df.select(df.id).filter(df.id.isin(li)).show()

# getField() and getItem()

data = [
    (("James", "Bond"), ["Java", "C++"], {'hair':'black', 'eye':'brown'}),
    (("Bob", "Dylan"), ["C#", "C++"], {'hair':'white', 'eye':'blue'}),
    (("John", "Doe"), ["Python", "C++"], {'hair':'black', 'eye':'brown'}),
    (("Mark", "Lee"), ["Scala", "C++"], {'hair':'white', 'eye':'blue'})    
]

schema = StructType([
    StructField("name", StructType([
        StructField("fname", StringType(), True),
        StructField("lname", StringType(), True)
    ])),
    StructField("languages", ArrayType(StringType()), True),
    StructField("props", MapType(StringType(), StringType()), True)    
])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

# getField()

df.select(df.props.getField("hair"), df.props.eye).show() # from Map
df.select(col("name.fname"), df.name.getField("lname")).show() # from Struct

# getItem()
df.select(df.languages.getItem(1)).show()
df.select(df.name.getItem("fname")).show()

#-------------------------------------------------------------------------

# 5. Select

In [None]:
#-------------------------------------------------------------------------

data = [
    ("Bob", "Dylan", "USA", "CA"),
    ("John", "Smith", "USA", "NY"),
    ("James", "Bond", "USA", "CA"),
    ("Mark", "Lee", "USA", "FL")
]

columns = ["fname", "lname", "country", "state"]

df = spark.createDataFrame(data).toDF(*columns)
df.show(truncate=False)

#-------------------------------------------------------------------------

# 5.1 Select Single and Multiple Columns

df.select(df.fname, "lname", col("country"), df["state"]).show()
df.select(df.colRegex("`^.*name*`")).show()

#-------------------------------------------------------------------------

# 5.2 Select all columns from a List

df.select(columns).show()
df.select([col for col in df.columns]).show()
df.select("*").show()

#-------------------------------------------------------------------------

# 5.3 Select columns by index

df.select(df.columns[:3]).show()
df.select(df.columns[2:4]).show()

#-------------------------------------------------------------------------

# 5.4 Select Nested Struct columns


data = [
        (("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]
       
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show(truncate=False)

df.select(df.name).show()
df.select(df.name.firstname, col("name.lastname")).show()
df.select(col("name.*")).show()

#-------------------------------------------------------------------------

# 6. Collect

In [None]:
#-------------------------------------------------------------------------

dept = [
    ("Finance", 10),
    ("Marketing", 20),
    ("Sales", 30),
    ("IT", 40)
]
dept_cols = ["dept_name", "dept_id"]

df = spark.createDataFrame(dept).toDF(*dept_cols)
df.show()

# collect all the data at driver node

collected_data = df.collect()
print(collected_data)

for row in collected_data:
    print(f"{row.dept_name},{row.dept_id} || {row['dept_name']},{row['dept_id']} || {row[0]},{row[1]}")
        
df.collect()[0][0] # get first row and value of first column

collected_data = df.select("dept_name").collect()
print(collected_data)

#-------------------------------------------------------------------------

# 7. withColumn

In [None]:
#-------------------------------------------------------------------------

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 7.1 change data type of a column

df.withColumn("salary", col("salary").cast(IntegerType())).printSchema()

#-------------------------------------------------------------------------

# 7.2 update the values of a column

df.withColumn("salary", expr("salary * 100")).show()

#-------------------------------------------------------------------------

# 7.3 create column from existing column

df.withColumn("copied_column", col("salary") * -1).show()

#-------------------------------------------------------------------------

# 7.4 add new column in a data frame

df.withColumn("country", lit("USA")).withColumn("state", lit("CA")).show()

#-------------------------------------------------------------------------

# 7.5 rename column

df.withColumnRenamed("gender", "sex").show()

#-------------------------------------------------------------------------

# 7.6 drop column

df.drop("salary").show()

#-------------------------------------------------------------------------

# 8. withColumnRenamed

In [None]:
#-------------------------------------------------------------------------

dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])


df = spark.createDataFrame(dataDF, schema)
df.printSchema()

#-------------------------------------------------------------------------

# 8.1 rename df column

df.withColumnRenamed("dob", "date_of_birth").printSchema()

#-------------------------------------------------------------------------

# 8.2 rename multiple columns

df.withColumnRenamed("dob", "date_of_birth").withColumnRenamed("gender", "sex").printSchema()

#-------------------------------------------------------------------------

# 8.3 using StructType to rename nested columns

schema2 = StructType([
    StructField("fname", StringType()),
    StructField("middlename", StringType()),
    StructField("lname", StringType())
])

df.withColumn("name", col("name").cast(schema2)).printSchema()

#-------------------------------------------------------------------------

# 8.4 using select to rename nested elements

df.select(col("name.firstname").alias("fname"), col("name.lastname").alias("lname")).printSchema()

#-------------------------------------------------------------------------

# 8.5 using withColumn to rename nested columns

df.withColumn("fname", col("name.firstname")).withColumn("lname", col("name.lastname")).printSchema()

#-------------------------------------------------------------------------

# 8.6 using toDF() to change all column names in df

new_cols = ["col1", "col2", "col3", "col4"]
df.toDF(*new_cols).printSchema()

#-------------------------------------------------------------------------

# 9. where() and filter()

In [None]:
#-------------------------------------------------------------------------

data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show(truncate=False)

#-------------------------------------------------------------------------

# 9.1 filter with column condition

df.filter(df.state == 'OH').show()
df.filter(df.state != 'OH').show()
df.filter(~(df.state == 'OH')).show()
df.filter(col("state") == 'OH').show()

#-------------------------------------------------------------------------

# 9.2 filter with SQL expression

df.filter("gender == 'M'").show()
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()

#-------------------------------------------------------------------------

# 9.3 filter with multiple conditions

df.filter((df.state == 'OH') & (df.gender=='M')).show()

#-------------------------------------------------------------------------

# 9.4 filter based on list values

li = ['OH', 'CA']

df.filter(df.state.isin(li)).show()
df.filter(~(df.state.isin(li))).show()
df.filter(df.state.isin(li) == 'False').show()

#-------------------------------------------------------------------------

# 9.5 filter based on startswith(), endswith(), contains()

df.filter(col("state").startswith("N")).show()
df.filter(df["state"].endswith("Y")).show()
df.filter(df.state.contains("O")).show()

#-------------------------------------------------------------------------

# 9.6 filter on array column

df.filter(array_contains(df.languages, "Java")).show()

#-------------------------------------------------------------------------

# 9.7 filter on nested struct columns

df.filter(df.name.lastname=='Williams').show(truncate=False)

#-------------------------------------------------------------------------

# 9.8 like and rlike()

data = [
    (2,"Shane Bond"),
    (3, "James Bond"),
    (4, "Mark bond"),
    (5, "Brett Lee")
]

df = spark.createDataFrame(data=data).toDF("id", "name")
df.show()

# case sensitive comparison

df.filter(df.name.like("%bond%")).show()

# case in-sensitive comparison using rlike(regex like)

df.filter(df.name.rlike("(?i)^*bond")).show()

#-------------------------------------------------------------------------

# 10.  distinct(), drop() and dropDuplicates()

In [None]:
#-------------------------------------------------------------------------

data = [
        ("James", "Sales", 3000), 
        ("Michael", "Sales", 4600), 
        ("Robert", "Sales", 4100),
        ("Maria", "Finance", 3000),
        ("James", "Sales", 3000), 
        ("Scott", "Finance", 3300),
        ("Jen", "Finance", 3900),
        ("Jeff", "Marketing", 3000),
        ("Kumar", "Marketing", 2000),
        ("Saif", "Sales", 4100),
]

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.orderBy("department", "salary").show(truncate=False)

#-------------------------------------------------------------------------

# 10.1 get distinct rows by comparing all columns using distict() function

df1 = df.distinct()
df1.orderBy("department", "salary").show()

# 10.2 get distinct rows by using dropDuplicates() function
df1 = df.dropDuplicates()
df1.orderBy(col("department"), col("salary")).show()

# 10.3 get distinct rows considering few columns by using dropDuplicates() function
df1 = df.dropDuplicates(["department", "salary"])
df1.orderBy(df1.department, df1.salary).show()

#-------------------------------------------------------------------------

# 11. orderBy() and sort()

In [None]:
#-------------------------------------------------------------------------

"""
We can use either sort() or orderBy() function
to sort df by ascending or descending order
based on single or multiple columns.
"""
#-------------------------------------------------------------------------

simpleData = [
    ("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema=columns)
df.printSchema()
df.show(truncate=False)

#-------------------------------------------------------------------------

# 11.1 sort() function

df.sort("department", "state").show()

#-------------------------------------------------------------------------

# 11.2 orderBy() function

df.orderBy(col("department"), col("state")).show()

#-------------------------------------------------------------------------

# 11.3 asc() and desc() methods

df.sort(col("department").asc(), col("state").desc()).show()
df.orderBy(df.department.asc(), df.state.desc()).show()

#-------------------------------------------------------------------------

# 12. groupBy()

In [None]:
#-------------------------------------------------------------------------

simpleData = [
    ("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema=columns)
df.printSchema()
df.show(truncate=False)

#-------------------------------------------------------------------------

df1 = df.groupBy("department").agg(
    sum("salary").alias("total_salary"), avg("salary").alias("average_salary")
)
df1.show()

df1 = df.groupBy("department").max("salary")
df1.show()

df1 = df.groupBy("department").count()
df1.show()

df1 = df.groupBy("department", "state").sum("salary", "bonus")
df1.show()

#-------------------------------------------------------------------------

# filter on aggregate data

df1 = df.groupBy("department").agg(
    sum("salary").alias("total_salary")
).where(col("total_salary") >= 200000)

df1.show()

#-------------------------------------------------------------------------

# 13. Join

In [None]:
#-------------------------------------------------------------------------

emp = [
    (1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
    (6,"Brown",2,"2010","50","",-1)
]

empColumns = ["emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema=empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [
    ("Finance",10),
    ("Marketing",20), 
    ("Sales",30), 
    ("IT",40) 
]

deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema=deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)


#-------------------------------------------------------------------------

# inner, left, right, full, leftsemi, leftanti

df = empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "inner")
df.show()

# self join

df = empDF.alias("emp1").join(empDF.alias("emp2"), col("emp1.superior_emp_id") == col("emp2.emp_id") ,"left").select(
    col("emp1.emp_id"), col("emp1.name"), col("emp2.emp_id").alias("superior_emp_id"), col("emp2.name").alias("manager_name"))
df.show()

#-------------------------------------------------------------------------

# 14. union() and unionAll()

In [None]:
#-------------------------------------------------------------------------

simpleData = [
    ("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000)
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df1 = spark.createDataFrame(data=simpleData, schema=columns)
df1.printSchema()
df1.show(truncate=False)

simpleData2 = [
    ("James","Sales","NY",90000,34,10000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]
df2 = spark.createDataFrame(data=simpleData2, schema=columns2)
df2.printSchema()
df2.show(truncate=False)

#-------------------------------------------------------------------------

df1.union(df2).show()
df1.unionAll(df2).show()
df1.union(df2).distinct().show()

#-------------------------------------------------------------------------

# 15. UDF()

In [None]:
#-------------------------------------------------------------------------

columns = ["Seqno","Name"]

data = [
    ("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")
]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

#-------------------------------------------------------------------------

# 15.1 create udf and use it as column object expression

def convert_case(name):
    res = ""
    words = name.split()
    for word in words:
        res = res + word[0].upper() + word[1:] + " "
    return res

convert_case_udf = udf(convert_case, StringType())

df1 = df.withColumn("Name", convert_case_udf(col("Name")))
df1.show()

df2 = df.select("Seqno", convert_case_udf(col("Name")).alias("Name"))
df2.show()

#-------------------------------------------------------------------------

# 15.2 create udf and use it as SQL function

spark.udf.register("convert_case_udf", convert_case, StringType())

df.createOrReplaceTempView("student_tbl")

df3 = spark.sql("select Seqno, convert_case_udf(Name) as `Name` from student_tbl")
df3.show()

#-------------------------------------------------------------------------

# 15.3 creating udf using annotation

@udf(returnType=StringType())
def upper_case(string):
    return string.upper()

df.withColumn("Name", upper_case(col("Name"))).show()

#-------------------------------------------------------------------------

# 15.4 Null check

def capitalize(string):
    """
    Whenever possible handle null 
    check inside the function 
    implementation
    """
    if bool(string):
        return string.capitalize()
    return ""

cols = [
    "Seqno",
    "Name"
]

data = [
    ("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)
]

df = spark.createDataFrame(data=data,schema=cols)
df.show()

capitalize_udf = udf(capitalize, StringType())

df.withColumn("Name", capitalize_udf(col("Name"))).show()

#-------------------------------------------------------------------------

# 16. map()

In [None]:
#-------------------------------------------------------------------------

"""
1. map() transformation can be only applied on RDD's.
Hence to apply map transformation on DataFrame, convert
it into RDD and then apply transformation.
"""
#-------------------------------------------------------------------------

# 16.1 map() on rdd 

data = [
    "Project",
    "Gutenberg’s",
    "Alice’s",
    "Adventures",
    "in",
    "Wonderland",
    "Project",
    "Gutenberg’s",
    "Adventures",
    "in",
    "Wonderland",
    "Project",
    "Gutenberg’s"
]

word_rdd = spark.sparkContext.parallelize(data)

paired_rdd = word_rdd.map(lambda x: (x,1))

for el in paired_rdd.collect():
    print(el)

#-------------------------------------------------------------------------

# 16.2 map() on DataFrame


data = [
  ('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]

df = spark.createDataFrame(data=data, schema=columns)
df.show()

# convert df to rdd to apply map transformation
rdd2 = df.rdd.map(lambda x: (x.firstname + "," + x.lastname, x.gender, x.salary * 3))
df2 = rdd2.toDF(["name", "gender", "salary"])
df2.show()

"""

Different ways to refer columns inside a map function.

1. using index:
rdd2 = df.rdd.map(lambda x: (x[0] + "," + x[1], x[2], x[3] * 2))

2. using column names:
rdd2 = df.rdd.map(lambda x: (x["firstname"] + "," + x["lastname"], x["gender"], x["salary"] * 2))
rdd2 = df.rdd.map(lambda x: (x.firstname + "," + x.lastname, x.gender, x.salary * 2))

3. create a custom function to perform an operation:
def func(x):
    fname = x.firstname
    lname = x.lastname
    gender = x.gender.lower()
    sal = x.salary * 2
    return (fname + ',' + lastname, gender, sal)
    
rdd2 = df.rdd.map(lamda x: func(x))

"""

#-------------------------------------------------------------------------

# 17. flatMap()

In [None]:
#-------------------------------------------------------------------------

# 17.1 flatMap() on rdd

data = [
    "Project Gutenberg’s",
    "Alice’s Adventures in Wonderland",
    "Project Gutenberg’s",
    "Adventures in Wonderland",
    "Project Gutenberg’s"
]

rdd=spark.sparkContext.parallelize(data)
for el in rdd.collect():
    print(el)

print("\n")

rdd2 = rdd.flatMap(lambda x: x.split())
for el in rdd2.collect():
    print(el)

print("\n")

paired_rdd = rdd2.map(lambda x: (x,1))

grouped_rdd = paired_rdd.groupByKey().mapValues(lambda x: len(x))
for el in grouped_rdd.collect():
    print(el)
    
print("\n")

reduced_by_key_rdd = paired_rdd.reduceByKey(lambda x,y: x+y)
for el in reduced_by_key_rdd.collect():
    print(el)

print("\n")

#-------------------------------------------------------------------------

# 17.2 flatMap() on DataFrame

arrayData = [
    ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
    ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
    ('Robert',['CSharp',''],{'hair':'red','eye':''}),
    ('Washington',None,None),
    ('Jefferson',['1','2'],{})
]

df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])

df1 = df.select("name", explode(col("knownLanguages")).alias("lang"))
df1.show()

df2 = df.select("name", explode(col("properties")).alias("part", "color"))
df2.show()


rdd1 = df.rdd.flatMap(lambda x: x.knownLanguages if bool(x.knownLanguages) else ["unknown"]) # rdd of type str
rdd1 = rdd1.map(lambda x: (x,))  # or rdd1.map(lambda x: Row(x))
df3 = rdd1.toDF(["lang"])
df3.show()

#-------------------------------------------------------------------------

# 18. foreach()

In [None]:
#-------------------------------------------------------------------------

columns = ["Seqno","Name"]

data = [
    ("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")
]

df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)

df.foreach(lambda x: print(f"{x.Seqno} --> {x.Name}"))

def func(x):
    print(x.Name.upper())
    
df.foreach(func)

#-------------------------------------------------------------------------

# 19. fillna() and fill()

In [None]:
#-------------------------------------------------------------------------
"""
This functions are used to replace 
null or None values with:
1. zero
2. empty string
3. space
4. constant literal

fillna() and fill()
are aliases of each
other and returns the
same results.
"""

data = [
    (1,704,'STANDARD',None,'PR',30100),
    (2,704,None,'PASEO COSTA DEL SUR','PR',None),
    (3,709,None,'BDA SAN LUIS','PR',3700),
    (4,76166,'UNIQUE','CINGULAR WIRELESS','TX',84000),
    (5,76177,'STANDARD',None,'TX',None)
]

cols = ['id', 'zipcode', 'type', 'city', 'state', 'population']

df = spark.createDataFrame(data=data, schema=cols)
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 19.1 replace null or None values with 0 for int and long type cols

df.na.fill(value=0).show()
df.na.fill(value=0,subset=['population']).show()
df.fillna(value=0).show()

#-------------------------------------------------------------------------

# 19.2 replace null or None values with empty string for string type cols

df.na.fill(value='').show()
df.na.fill(value='unknown', subset=['city']).na.fill(value='', subset=['type']).show()
df.na.fill(value={'city': 'unknown', 'type': ''}).show()

#-------------------------------------------------------------------------

# 20. pivot()

In [None]:
#-------------------------------------------------------------------------

data = [
    ('apple', 1000, 'USA'),
    ('carrot', 1500, 'USA'),
    ('beans', 1600, 'USA'),
    ('orange', 2000, 'USA'),
    ('orange', 2000, 'USA'),
    ('apple', 400, 'CHINA'),
    ('carrot', 1200, 'CHINA'),
    ('beans', 1500, 'CHINA'),
    ('orange', 4000, 'CHINA'),
    ('apple', 2000, 'CANADA'),
    ('carrot', 2000, 'CANADA'),
    ('beans', 2000, 'MEXICO')
]

cols = ['product', 'amount', 'country']

df = spark.createDataFrame(data=data, schema=cols)
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 20.1 pivot spark dataframe

pivot_df = df.groupBy('product').pivot('country').agg(sum("amount").alias("total_amount"))
pivot_df.show()

# pivot() is an expensive operation. PySpark 2.0 onwards its performance has been improved.
# It uses two phase aggregation to improve the performance

pivot_df = (df
            .groupBy('product', 'country')
            .agg(sum('amount').alias('total_amount'))
            .groupBy('product')
            .pivot('country')
            .agg(sum('total_amount'))
           )
pivot_df.show()

#-------------------------------------------------------------------------

# 20.2 unpivot spark dataframe

unpivot_df = pivot_df.select('product', 
                             expr("stack(4,'CANADA',CANADA,'CHINA',CHINA,'MEXICO',MEXICO,'USA',USA) as (country, total_amount)")
                            )
unpivot_df = unpivot_df.filter(col("total_amount").isNotNull()).orderBy("country")
unpivot_df.show()

#-------------------------------------------------------------------------

# 21.ArrayType()

In [None]:
#-------------------------------------------------------------------------

data = [
    ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
    ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
    ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

#-------------------------------------------------------------------------

# 21.1 explode()

df.select(df.name, explode(df.languagesAtSchool)).show()

#-------------------------------------------------------------------------

# 21.2 split()

df.select(split(df.name, ',').alias("nameAsArray")).show() 

#-------------------------------------------------------------------------

# 21.3 array()

"""
use array() function to create a new array column
by merging data from multiple columns.
All input columns must have same data type.
"""

df.select(df.name, array(df.currentState, df.previousState).alias("states")).show()

#-------------------------------------------------------------------------

# 21.4 array_contains()

df.select(df.name, array_contains(df.languagesAtSchool, "Java")).show()

#-------------------------------------------------------------------------

# 22. MapType()

In [None]:
#-------------------------------------------------------------------------

dataDictionary = [
    ('James',{'hair':'black','eye':'brown'}),
    ('Michael',{'hair':'brown','eye':None}),
    ('Robert',{'hair':'red','eye':'black'}),
    ('Washington',{'hair':'grey','eye':'grey'}),
    ('Jefferson',{'hair':'brown','eye':''})
]

schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])

df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.printSchema()
df.show(truncate=False)

#-------------------------------------------------------------------------

# 22.1 extract key and value into separate columns

df1 = df.rdd.map(lambda x: (x.name, x.properties["eye"], x.properties["hair"])).toDF(["name", "eye", "hair"])
df1.show()

df1 = df.withColumn("eye", df.properties.getItem("eye")).withColumn("hair", df.properties.getItem("hair"))
df1.show()

df1 = df.withColumn("eye", df.properties["eye"]).withColumn("hair",df.properties["hair"])
df1.show()

#-------------------------------------------------------------------------

# 22.2 explode()

df.select(df.name, explode(df.properties)).show()

#-------------------------------------------------------------------------

# 22.3 map_keys()

df.select(df.name, map_keys(df.properties)).show()

#-------------------------------------------------------------------------

# 22.4 map_values()

df.select(df.name, map_values(df.properties)).show()

#-------------------------------------------------------------------------