In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]

df = spark.createDataFrame(data=data,schema=columns);
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



### Rename existing column

In [None]:
df=df.withColumnRenamed("firstname","FirstName")
df.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
## rename multiple columns at a time
from pyspark.sql.functions import *
df = df.select(col("FirstName"),
  col("middlename").alias("MiddleName"),
  col("lastname").alias("LastName"),
  col("dob").alias("DateOfBirth"),
  col("gender"),
  col("salary"));

df.show()

## we can also use multiple withcolumn to rename columns

+---------+----------+--------+-----------+------+------+
|FirstName|MiddleName|LastName|DateOfBirth|gender|salary|
+---------+----------+--------+-----------+------+------+
|    James|          |   Smith| 1991-04-01|     M|  3000|
|  Michael|      Rose|        | 2000-05-19|     M|  4000|
|   Robert|          |Williams| 1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones| 1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown| 1980-02-17|     F|    -1|
+---------+----------+--------+-----------+------+------+



### WithColumn() in depth

In [None]:
## change datatype of salary column
df = df.withColumn("salary",col("salary").cast("Integer"))
df.printSchema()

##update values of salary column
df = df.withColumn("salary",col("salary")*100);
df.show()

## create new column form existiing
df = df.withColumn("new_salary",col("salary")-1);
df.show()

## add a new column with existsing constant calues
df = df.withColumn("ratings",lit("****"));
df.show()

## drop a column

df=df.drop("ratings");
df.show()

root
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----------+------+------+
|FirstName|MiddleName|LastName|DateOfBirth|gender|salary|
+---------+----------+--------+-----------+------+------+
|    James|          |   Smith| 1991-04-01|     M|300000|
|  Michael|      Rose|        | 2000-05-19|     M|400000|
|   Robert|          |Williams| 1978-09-05|     M|400000|
|    Maria|      Anne|   Jones| 1967-12-01|     F|400000|
|      Jen|      Mary|   Brown| 1980-02-17|     F|  -100|
+---------+----------+--------+-----------+------+------+

+---------+----------+--------+-----------+------+------+----------+
|FirstName|MiddleName|LastName|DateOfBirth|gender|salary|new_salary|
+---------+----------+--------+-----------+------+------+----------+
|    James|          

## filter() funtion

In [None]:
df.filter(df.salary>=300000).show();
df.filter((df.FirstName.startswith("M"))& (df.gender=="M")).show();
## filter using like function
df.filter(df.DateOfBirth.like("19%")).show()

+---------+----------+--------+-----------+------+------+----------+
|FirstName|MiddleName|LastName|DateOfBirth|gender|salary|new_salary|
+---------+----------+--------+-----------+------+------+----------+
|    James|          |   Smith| 1991-04-01|     M|300000|    299999|
|  Michael|      Rose|        | 2000-05-19|     M|400000|    399999|
|   Robert|          |Williams| 1978-09-05|     M|400000|    399999|
|    Maria|      Anne|   Jones| 1967-12-01|     F|400000|    399999|
+---------+----------+--------+-----------+------+------+----------+

+---------+----------+--------+-----------+------+------+----------+
|FirstName|MiddleName|LastName|DateOfBirth|gender|salary|new_salary|
+---------+----------+--------+-----------+------+------+----------+
|  Michael|      Rose|        | 2000-05-19|     M|400000|    399999|
+---------+----------+--------+-----------+------+------+----------+

+---------+----------+--------+-----------+------+------+----------+
|FirstName|MiddleName|LastName|D

### orderby and sort

In [None]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
# Create SparkSession

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [None]:
df1 = df.sort("salary")
df1.show()
df1 = df.sort("salary","age",ascending=[False,True])
# both do same work the only diffrence is col() sorts acc to column type
# and the 1st one takes string type as column
df1 = df.sort(col("salary"),col("age"),ascending=[False,True])
df1.show()

df1 = df.orderBy(df.employee_name.asc(),df.state.asc())
df1.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Raman|   Finance|   CA| 99000| 40|24000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|

## orderBy()

In [None]:
df1 = df.orderBy("department")
df1.show()
df1 = df.orderBy(df.employee_name.asc(),df.state.asc())
df1.show()
df1 = df.orderBy(col("department").asc(),col("salary").desc())
df1.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Raman|   Finance|   CA| 99000| 40|24000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|

### PySpark Explode Array and Map Columns to Rows

In [None]:
arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})
        ]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-------------------+-----------------------------+
|name      |knownLanguages     |properties                   |
+----------+-------------------+-----------------------------+
|James     |[Java, Scala]      |{eye -> brown, hair -> black}|
|Michael   |[Spark, Java, null]|{eye -> null, hair -> brown} |
|Robert    |[CSharp, ]         |{eye -> , hair -> red}       |
|Washington|null               |null                         |
|Jefferson |[1, 2]             |{}                           |
+----------+-------------------+-----------------------------+



## explode will not map null values

In [None]:
## explode array
from pyspark.sql.functions import explode;

df1 = df.select(df.name,explode(df.knownLanguages))
df1.printSchema()
df1.show(truncate = False)

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|name     |col   |
+---------+------+
|James    |Java  |
|James    |Scala |
|Michael  |Spark |
|Michael  |Java  |
|Michael  |null  |
|Robert   |CSharp|
|Robert   |      |
|Jefferson|1     |
|Jefferson|2     |
+---------+------+



In [None]:
## explode map
df1 = df.select(df.name,explode(df.properties))
df1.printSchema()
df1.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)

+-------+----+-----+
|name   |key |value|
+-------+----+-----+
|James  |eye |brown|
|James  |hair|black|
|Michael|eye |null |
|Michael|hair|brown|
|Robert |eye |     |
|Robert |hair|red  |
+-------+----+-----+



## explode_outer() it is same as explode() but it will map null values too

In [None]:
## performing epxlode on array
df1 = df.select(df.name,explode_outer(df.knownLanguages))
df1.show()

## performing explode on map
df1 = df.select(df.name,explode_outer(df.properties))
df1.show()

+----------+------+
|      name|   col|
+----------+------+
|     James|  Java|
|     James| Scala|
|   Michael| Spark|
|   Michael|  Java|
|   Michael|  null|
|    Robert|CSharp|
|    Robert|      |
|Washington|  null|
| Jefferson|     1|
| Jefferson|     2|
+----------+------+

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James| eye|brown|
|     James|hair|black|
|   Michael| eye| null|
|   Michael|hair|brown|
|    Robert| eye|     |
|    Robert|hair|  red|
|Washington|null| null|
| Jefferson|null| null|
+----------+----+-----+



## posexplode() same as explode() the only diffrence is that it also returns the position of array or map element

In [None]:
## using posexplode with array
df1 = df.select(df.name,posexplode(df.knownLanguages))
df1.show()

## using posexplode() on map
df1 = df.select(df.name,posexplode(df.properties))
df1.show()

+---------+---+------+
|     name|pos|   col|
+---------+---+------+
|    James|  0|  Java|
|    James|  1| Scala|
|  Michael|  0| Spark|
|  Michael|  1|  Java|
|  Michael|  2|  null|
|   Robert|  0|CSharp|
|   Robert|  1|      |
|Jefferson|  0|     1|
|Jefferson|  1|     2|
+---------+---+------+

+-------+---+----+-----+
|   name|pos| key|value|
+-------+---+----+-----+
|  James|  0| eye|brown|
|  James|  1|hair|black|
|Michael|  0| eye| null|
|Michael|  1|hair|brown|
| Robert|  0| eye|     |
| Robert|  1|hair|  red|
+-------+---+----+-----+



## posexplode_outer()

In [None]:
## using posexplode_outer() with array
df1 = df.select(df.name,posexplode_outer(df.knownLanguages))
df1.show()

## using posexplode() on map
df1 = df.select(df.name,posexplode_outer(df.properties))
df1.show()
df.show()

+----------+----+------+
|      name| pos|   col|
+----------+----+------+
|     James|   0|  Java|
|     James|   1| Scala|
|   Michael|   0| Spark|
|   Michael|   1|  Java|
|   Michael|   2|  null|
|    Robert|   0|CSharp|
|    Robert|   1|      |
|Washington|null|  null|
| Jefferson|   0|     1|
| Jefferson|   1|     2|
+----------+----+------+

+----------+----+----+-----+
|      name| pos| key|value|
+----------+----+----+-----+
|     James|   0| eye|brown|
|     James|   1|hair|black|
|   Michael|   0| eye| null|
|   Michael|   1|hair|brown|
|    Robert|   0| eye|     |
|    Robert|   1|hair|  red|
|Washington|null|null| null|
| Jefferson|null|null| null|
+----------+----+----+-----+

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, 

In [None]:
arrayArrayData = [
  ("James",[["Java","Scala","C++"],["Spark","Java"]]),
  ("Michael",[["Spark","Java","C++"],["Spark","Java"]]),
  ("Robert",[["CSharp","VB"],["Spark","Python"]])
]

df = spark.createDataFrame(data=arrayArrayData, schema = ['name','subjects'])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- subjects: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

+-------+-----------------------------------+
|name   |subjects                           |
+-------+-----------------------------------+
|James  |[[Java, Scala, C++], [Spark, Java]]|
|Michael|[[Spark, Java, C++], [Spark, Java]]|
|Robert |[[CSharp, VB], [Spark, Python]]    |
+-------+-----------------------------------+



In [None]:
df.select(df.name,explode(df.subjects)).show()
df.select(df.name,flatten(df.subjects)).show(truncate=False)

+-------+------------------+
|   name|               col|
+-------+------------------+
|  James|[Java, Scala, C++]|
|  James|     [Spark, Java]|
|Michael|[Spark, Java, C++]|
|Michael|     [Spark, Java]|
| Robert|      [CSharp, VB]|
| Robert|   [Spark, Python]|
+-------+------------------+

+-------+-------------------------------+
|name   |flatten(subjects)              |
+-------+-------------------------------+
|James  |[Java, Scala, C++, Spark, Java]|
|Michael|[Spark, Java, C++, Spark, Java]|
|Robert |[CSharp, VB, Spark, Python]    |
+-------+-------------------------------+



### groupBy()

In [None]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [None]:
# calculate sum of salaies in departments
df.groupBy("department").sum("salary").show()

# calculate number of employee in each department
df.groupBy("department").count().show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|     257000|
|   Finance|     351000|
| Marketing|     171000|
+----------+-----------+

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|   Finance|    4|
| Marketing|    2|
+----------+-----+



In [None]:
import logging
logging.getLogger().setLevel(logging.INFO)

def function():
  try:
    df = spark.read.csv("/content/sample_data/california_housing_test.csv",header =True)
    df.show()
    logging.info("Success")
  except Exception as e:
    logging.info("no success")
  finally:

    logging.info("finally block")
function()

INFO:root:Success
INFO:root:finally block


+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000| 277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000| 237.000000|     2.937500|    