In [0]:
'''
Using sort() function
Using orderBy() function
Ascending order
Descending order
SQL Sort functions

'''

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField,IntegerType,StringType

#-----------------------------------#

sc = SparkContext.getOrCreate()
spark =  SparkSession(sc)


In [0]:
simpleData = (("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  )
schema = StructType([StructField("employee_name", StringType(), True),
  StructField("department", StringType(), True),
  StructField("state", StringType(), True),
  StructField("salary",IntegerType(), True),
  StructField("age",IntegerType(), True),
  StructField("bonus",IntegerType(), True)])

df = spark.createDataFrame(data = simpleData,schema = schema)
df.show()

In [0]:
df.sort("department","state").show()

In [0]:
df.sort(col("department"),col("state")).show()

In [0]:
df.orderBy("department","state").show()

In [0]:
df.orderBy(col("department"),col("state")).show()

In [0]:
df.sort(col("department").asc(),col("state").asc()).show()

In [0]:
df.orderBy(col("department").asc(),col("state").asc()).show()

In [0]:
df.sort(col("department").asc(),col("state").desc()).show()

In [0]:
df.orderBy(col("department").asc(),col("state").desc()).show()

In [0]:
df.select(col("employee_name"),asc("department"),desc("state"),"salary","age","bonus").show()

In [0]:
df.createOrReplaceTempView("EMP")


In [0]:
%sql
spark.sql("select employee_name,asc('department'),desc('state'),salary,age,bonus from EMP").show()

In [0]:
simpleData = (("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000))

schema = StructType([StructField("employee_name", StringType(), True),
  StructField("department", StringType(), True),
  StructField("state", StringType(), True),
  StructField("salary",IntegerType(), True),
  StructField("age",IntegerType(), True),
  StructField("bonus",IntegerType(), True)])

df = spark.createDataFrame(data = simpleData,schema = schema)
df.show()
 

In [0]:
simpleData2 = (("James","Sales","NY",90000,34,10000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000))

df2 = spark.createDataFrame(data = simpleData2,schema =schema  )
               
df2.show()

In [0]:
df3 = df.union(df2)
df3.show()

In [0]:
df4 = df.unionAll(df2)
df4.show()

In [0]:
df5 = df.union(df2).distinct()
df5.show()

In [0]:
# File location and type
file_location = "/FileStore/tables/small.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", True) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,zipcode,type,city,state,population
1,704,STANDARD,,PR,30100.0
2,704,,PASEO COSTA DEL SUR,PR,
3,709,,BDA SAN LUIS,PR,3700.0
4,76166,UNIQUE,CINGULAR WIRELESS,TX,84000.0
5,76177,STANDARD,,TX,


In [0]:
# Create a view or table

temp_table_name = "small_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:

df.na.drop().show()

In [0]:
#Accepts all or any
df.na.drop("any").show()

In [0]:
#Drop Rows with NULL Values on All Columns
df.na.drop("all").show()

In [0]:
df.na.drop(subset = ["population","type"]).show()

In [0]:
'''Spark – Split DataFrame single column into multiple columns'''

In [0]:
data = (("James, A, Smith","2018","M",3000),
    ("Michael, Rose, Jones","2010","M",4000),
    ("Robert,K,Williams","2010","M",4000),
    ("Maria,Anne,Jones","2005","F",4000),
    ("Jen,Mary,Brown","2010","",-1))


schema = StructType([StructField("name", StringType(), True),
  StructField("dob_year", StringType(), True),
  StructField("gender",StringType(), True),
  StructField("salary",IntegerType(), True)])

df = spark.createDataFrame(data = data,schema = schema)
df.show()

In [0]:
'''Split DataFrame column to multiple columns'''

df2 = df.select(split(col("name"),",").getItem(0).alias("FirstName"), \
    split(col("name"),",").getItem(1).alias("MiddleName"), \
    split(col("name"),",").getItem(2).alias("LastName")).drop("name")

df2.printSchema()
df2.show()


In [0]:
'''Splitting column using withColumn'''

splitDF = df.withColumn("FirstName",split(col("name"),",").getItem(0)) \
    .withColumn("MiddleName",split(col("name"),",").getItem(1)) \
    .withColumn("LastName",split(col("name"),",").getItem(2)) \
    .withColumn("NameArray",split(col("name"),",")) \
    .drop("name")
splitDF.printSchema()
splitDF.show()



In [0]:
'''Split DataFrame column using raw Spark SQL'''

df.createOrReplaceTempView("PERSON")
spark.sql("select SPLIT(name,',') as NameArray from PERSON").show()


In [0]:
'''Spark – How to Concatenate DataFrame columns'''

In [0]:
splitDF.show()

In [0]:
splitDF.select(concat(col("FirstName"),lit(','), \
    col("MiddleName"),lit(','),col("LastName")).alias("FullName")) \
      .show()

In [0]:
'''Using concat() Function on withColumn()'''


splitDF.withColumn("FullName",concat(col("FirstName"),lit(','), \
    col("MiddleName"),lit(','),col("LastName"))) \
    .show()

In [0]:
splitDF.withColumn("FullName",concat(col("FirstName"),lit(','), \
    col("MiddleName"),lit(','),col("LastName"))) \
    .drop("FirstName") \
    .drop("MiddleName") \
    .drop("LastName") \
    .show()

In [0]:
'''Using concat_ws() Function to Concatenate with Delimiter'''

splitDF.withColumn("FullName",concat_ws(",",col("FirstName"),col("MiddleName"),col("LastName"))) \
    .drop("FirstName") \
    .drop("MiddleName") \
    .drop("LastName") \
      .show()






In [0]:
''' Using Raw SQL'''

splitDF.createOrReplaceTempView("EMP")

spark.sql("select CONCAT(FirstName,' ',LastName,' ',MiddleName) as FullName from EMP") \
    .show()

In [0]:
 spark.sql("select FirstName ||' '|| LastName ||' '|| MiddleName as FullName from EMP") \
    .show()

In [0]:
file_location = "/FileStore/tables/small.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", True) \
  .option("sep", delimiter) \
  .load(file_location)

df.show()

In [0]:
#Replace all integer and long columns
df.na.fill('0').show()


In [0]:
#Replace with specific columns
df.na.fill({'population':0}).show()

In [0]:
  df.na.fill("").show()

In [0]:
df.na.fill({'city':'unknown'}) \
    .na.fill({'type':''}) \
    .show()


In [0]:
simpleData = (("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100))

schema  =  StructType([StructField('employee_name',StringType(),True),
                      StructField('department',StringType(),True),
                      StructField('salary',IntegerType(),True)])

data  =  spark.createDataFrame(data  = simpleData,schema = schema)
data.show()

In [0]:
#DISTINCT

distinctDF = data.distinct()
print("Distinct count: ",distinctDF.count())
distinctDF.show()

In [0]:
 df2 = data.dropDuplicates()
 print("Distinct count: ",df2.count())
 df2.show()

In [0]:

'''Spark Distinct of Multiple Columns'''
dropDisDF = data.dropDuplicates(subset = ["department","salary"])
print("Distinct count of department & salary : ",dropDisDF.count())
dropDisDF.show()

In [0]:
import databricks.koalas as ks

emp = [(1,"Smith",-1,"2018",10,"M",3000),
    (2,"Rose",1,"2010",20,"M",4000),
    (3,"Williams",1,"2010",10,"M",1000),
    (4,"Jones",2,"2005",10,"F",2000),
    (5,"Brown",2,"2010",30,"",-1),
    (6,"Brown",2,"2010",50,"",-1)]
empColumns = ["emp_id","name","superior_emp_id","branch_id","dept_id",
  "gender","salary"]
empDF = ks.DataFrame(emp,columns = empColumns)
empDF = empDF.to_spark()
empDF.show()

 

In [0]:
dept = [("Finance",10,"2018"),
    ("Marketing",20,"2010"),
    ("Marketing",20,"2018"),
    ("Sales",30,"2005"),
    ("Sales",30,"2010"),
    ("IT",50,"2010")]

deptColumns = ["dept_name","dept_id","branch_id"]
deptDF = ks.DataFrame(dept,columns = deptColumns)
deptDF = deptDF.to_spark()
deptDF.show()

 

In [0]:
empDF.join(deptDF, empDF("dept_id") == deptDF("dept_id") & \
    empDF("branch_id") == deptDF("branch_id"),"inner") \
      .show()

In [0]:
 #Using Join with multiple columns on where clause 
  empDF.join(deptDF).where(empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"))
    .show(false)

In [0]:
#Using Join with multiple columns on filter clause
  empDF.join(deptDF).filter(empDF("dept_id") === deptDF("dept_id") &&
    empDF("branch_id") === deptDF("branch_id"))
    .show(false)

In [0]:
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

resultDF = spark.sql("select e.* from EMP e, DEPT d " +
    "where e.dept_id == d.dept_id and e.branch_id == d.branch_id")
resultDF.show()