In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]").appName("Saprk UI").getOrCreate()

In [0]:
spark

## Reading Data from CSV files!
## Understanding background of Spark UI
## Handling Bad records
## Spark read modes (PERMISSIVE, DROPMALFORMED, FAILFAST)

In [0]:
df = spark.read.format("csv").option("header", True).option("inferschema", True).load("dbfs:/FileStore/tables/Strings/emp_csv.txt")

In [0]:
df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: date (nullable = true)



## # .option("header", True) -- checks for 1st line
## # 
## # .option("inferschema", True)
## # 
## # These options creates a two seperate jobs. spark checks for metadata so it reads the metadata and create a job. Inorder to reduce the spark job schema can be imposed manually and is one of the optimization technique.
## # ex: schema = age int, employee_id int
## # df = spark.read.format("csv).schema(schema).load("path)


In [0]:
df.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

# HANDLING CORRUPTED RECORDS

## MODES:
## 
## : 3 type of reading mode into pyspark:
## 
## 1.Permissive Mode : its a default mode. if spark is not able to parse it due to dattype mismatch then make it as null without impacting other result, i.e only issue record are null others are fine.
## 
## 2.DROPMALFORMED : whatever record having parsing issue, i.e in any column there is issues it will totally ignore that record, it only show correct record.
## 
## 3.FailFast Mode : while reading data into this mode it will fail as soon as it encounters any record have parsing issues.

In [0]:
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, _corrupt_record string"

df_2 = spark.read.format("csv").option("header", True).schema(_schema).load("dbfs:/FileStore/tables/select&selectExpr/emp_new.txt")

In [0]:
df_2.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [0]:
df_2.show()

+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|employee_id|department_id|         name|age|gender| salary| hire_date|     _corrupt_record|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|          1|          101|     John Doe| 30|  Male|50000.0|2015-01-01|                null|
|          2|          101|   Jane Smith| 25|Female|45000.0|2016-02-15|                null|
|          3|          102|    Bob Brown| 35|  Male|55000.0|2014-05-01|                null|
|          4|          102|    Alice Lee| 28|Female|48000.0|2017-09-30|                null|
|          5|          103|    Jack Chan| 40|  Male|60000.0|2013-04-01|                null|
|          6|          103|    Jill Wong| 32|Female|52000.0|2018-07-01|                null|
|          7|          101|James Johnson| 42|  Male|   null|2012-03-15|007,101,James Joh...|
|          8|          102|     Kate Kim| 29|Female|51000.0|2019-10-01

In [0]:
df_2.where("_corrupt_record is not null").show()

+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|employee_id|department_id|         name|age|gender| salary| hire_date|     _corrupt_record|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|          7|          101|James Johnson| 42|  Male|   null|2012-03-15|007,101,James Joh...|
|         11|          104|   David Park| 38|  Male|65000.0|      null|011,104,David Par...|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+



In [0]:
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_2_m = spark.read.format("csv").option("mode", "DROPMALFORMED").option("header", True).schema(_schema).load("dbfs:/FileStore/tables/select&selectExpr/emp_new.txt")

In [0]:
df_2_m.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)



# If we use dropmalformed the corrupted records will be eliminated, row 7 & 11 wwill be eliminated.

In [0]:
df_2_m.show()

+-----------+-------------+-----------+---+------+-------+----------+
|employee_id|department_id|       name|age|gender| salary| hire_date|
+-----------+-------------+-----------+---+------+-------+----------+
|          1|          101|   John Doe| 30|  Male|50000.0|2015-01-01|
|          2|          101| Jane Smith| 25|Female|45000.0|2016-02-15|
|          3|          102|  Bob Brown| 35|  Male|55000.0|2014-05-01|
|          4|          102|  Alice Lee| 28|Female|48000.0|2017-09-30|
|          5|          103|  Jack Chan| 40|  Male|60000.0|2013-04-01|
|          6|          103|  Jill Wong| 32|Female|52000.0|2018-07-01|
|          8|          102|   Kate Kim| 29|Female|51000.0|2019-10-01|
|          9|          103|    Tom Tan| 33|  Male|58000.0|2016-06-01|
|         10|          104|   Lisa Lee| 27|Female|47000.0|2018-08-01|
|         12|          105| Susan Chen| 31|Female|54000.0|2017-02-15|
|         13|          106|  Brian Kim| 45|  Male|75000.0|2011-07-01|
|         14|       

## FAILFAST MODE

In [0]:
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_2_f = spark.read.format("csv").option("mode", "FAILFAST").option("header", True).schema(_schema).load("dbfs:/FileStore/tables/select&selectExpr/emp_new.txt")

In [0]:
df_2_f.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)



In [0]:
df_2_f.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-325482436491073>:1[0m
[0;32m----> 1[0m [43mdf_2_f[49m[38;5;241;43m.[39;49m[43mshow[49m[43m([49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m     51