In [4]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Reading from CSV Files")
    .master("local[*]")
    .getOrCreate()
)

spark

In [6]:
# Read a csv file into dataframe

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("datasets/emp.csv")


In [7]:
df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: date (nullable = true)



In [10]:
df.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [12]:
# Reading with Schema
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_schema = spark.read.format("csv").option("header",True).schema(_schema).load("datasets/emp.csv")


In [14]:
df_schema.show()

+-----------+-------------+-------------+---+------+-------+----------+
|employee_id|department_id|         name|age|gender| salary| hire_date|
+-----------+-------------+-------------+---+------+-------+----------+
|          1|          101|     John Doe| 30|  Male|50000.0|2015-01-01|
|          2|          101|   Jane Smith| 25|Female|45000.0|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male|55000.0|2014-05-01|
|          4|          102|    Alice Lee| 28|Female|48000.0|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male|60000.0|2013-04-01|
|          6|          103|    Jill Wong| 32|Female|52000.0|2018-07-01|
|          7|          101|James Johnson| 42|  Male|70000.0|2012-03-15|
|          8|          102|     Kate Kim| 29|Female|51000.0|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male|58000.0|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female|47000.0|2018-08-01|
|         11|          104|   David Park| 38|  Male|65000.0|2015

In [16]:
# Handle BAD records - PERMISSIVE (Default mode)

_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, bad_record string"

df_p = spark.read.format("csv").schema(_schema).option("columnNameOfCorruptRecord", "bad_record").option("header", True).load("datasets/emp_new.csv")


In [18]:
df_p.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- bad_record: string (nullable = true)



In [20]:
df_p.show()

+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|employee_id|department_id|         name|age|gender| salary| hire_date|          bad_record|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|          1|          101|     John Doe| 30|  Male|50000.0|2015-01-01|                NULL|
|          2|          101|   Jane Smith| 25|Female|45000.0|2016-02-15|                NULL|
|          3|          102|    Bob Brown| 35|  Male|55000.0|2014-05-01|                NULL|
|          4|          102|    Alice Lee| 28|Female|48000.0|2017-09-30|                NULL|
|          5|          103|    Jack Chan| 40|  Male|60000.0|2013-04-01|                NULL|
|          6|          103|    Jill Wong| 32|Female|52000.0|2018-07-01|                NULL|
|          7|          101|James Johnson| 42|  Male|   NULL|2012-03-15|007,101,James Joh...|
|          8|          102|     Kate Kim| 29|Female|51000.0|2019-10-01

In [26]:
# Handle BAD records - DROPMALFORMED
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_m = spark.read.format("csv").option("header", True).option("mode", "DROPMALFORMED").schema(_schema).load("datasets/emp_new.csv")


In [28]:
df_m.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)



In [30]:
df_m.show()

+-----------+-------------+-----------+---+------+-------+----------+
|employee_id|department_id|       name|age|gender| salary| hire_date|
+-----------+-------------+-----------+---+------+-------+----------+
|          1|          101|   John Doe| 30|  Male|50000.0|2015-01-01|
|          2|          101| Jane Smith| 25|Female|45000.0|2016-02-15|
|          3|          102|  Bob Brown| 35|  Male|55000.0|2014-05-01|
|          4|          102|  Alice Lee| 28|Female|48000.0|2017-09-30|
|          5|          103|  Jack Chan| 40|  Male|60000.0|2013-04-01|
|          6|          103|  Jill Wong| 32|Female|52000.0|2018-07-01|
|          8|          102|   Kate Kim| 29|Female|51000.0|2019-10-01|
|          9|          103|    Tom Tan| 33|  Male|58000.0|2016-06-01|
|         10|          104|   Lisa Lee| 27|Female|47000.0|2018-08-01|
|         12|          105| Susan Chen| 31|Female|54000.0|2017-02-15|
|         13|          106|  Brian Kim| 45|  Male|75000.0|2011-07-01|
|         14|       

In [None]:
# Handle BAD records - FAILFAST

_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_m = spark.read.format("csv").option("header", True).option("mode", "FAILFAST").schema(_schema).load("data/input/emp_new.csv")



In [32]:
df_m.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)



In [None]:
df_m.show()

In [34]:
# BONUS TIP
# Multiple options

_options = {
    "header" : "true",
    "inferSchema" : "true",
    "mode" : "PERMISSIVE"
}

df = (spark.read.format("csv").options(**_options).load("datasets/emp.csv"))


In [36]:
df.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [38]:
spark.stop()