# **Read Data in Pyspark:**

In [4]:
# Basic SparkSession creation

from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName("Read-Data").getOrCreate()

# ## What happens internally?
# Checks if a SparkSession already exists
# If yes → returns it
# If no → creates a new one
# That’s why it’s called getOrCreate().

In [5]:
# Full-featured SparkSession (Production-style)
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("Read-Data-Application")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "2g")
    .config("spark.sql.adaptive.enabled", "true")
    .enableHiveSupport()
    .getOrCreate()
)

In [6]:
data = [(1, "Piyush", 177.50), (2, "Bob", 175.00)]
df = spark.createDataFrame(data, ["id", "name", "height"])

df.show()
df.printSchema()


+---+------+------+
| id|  name|height|
+---+------+------+
|  1|Piyush| 177.5|
|  2|   Bob| 175.0|
+---+------+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- height: double (nullable = true)



In [7]:
df_csv = (spark.read
          .format("csv")
          .option("header", "true")                # first row as header
          .option("inferSchema", "true")           # let Spark infer types (ok for small files)
          .option("sep", ",")                      # delimiter
          .option("quote", '"')                    # quote char
          .option("escape", '"')                   # escape char
          .option("multiLine", "true")             # multi-line fields
          .option("ignoreLeadingWhiteSpace", "true")
          .option("ignoreTrailingWhiteSpace", "true")
          .option("mode", "PERMISSIVE")            # PERMISSIVE | DROPMALFORMED | FAILFAST
          .option("encoding", "UTF-8")
          .option("nullValue", "")
          .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
          .option("dateFormat", "yyyy-MM-dd")
          .option("pathGlobFilter", "*.csv")
          .option("recursiveFileLookup", "true")   # read nested folders
          .load(r"D:\GitLocal\Spark-The-Definitive-Guide\data\flight-data\csv\2015-summary.csv")).show()


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [8]:
df_parquet = (spark.read
              .format("parquet")
              .option("mergeSchema", "true")         # merge evolved schemas
              .option("pathGlobFilter", "*.parquet")
              .option("recursiveFileLookup", "true")
              .load(r"D:\GitLocal\big_data\Databricks\databricks-masterclass\data\shoppinginvoices\invoices_201_99457.parquet"))\
              .show()


+-----------+----------+------+---+---------------+--------+-------+--------------+------------+-----------------+-------------+
|customer_id|invoice_no|gender|age|       category|quantity|  price|payment_method|invoice_date|    shopping_mall|_rescued_data|
+-----------+----------+------+---+---------------+--------+-------+--------------+------------+-----------------+-------------+
|        201|   I885979|Female| 26|       Clothing|       3| 900.24|    Debit Card|  2021-07-04|        Metrocity|         null|
|        202|   I810217|Female| 51|       Clothing|       3| 900.24|          Cash|  2022-01-14|        Metrocity|         null|
|        203|   I499170|Female| 38|           Toys|       1|  35.84|   Credit Card|  2022-02-20|           Kanyon|         null|
|        204|   I792963|Female| 59|       Clothing|       5| 1500.4|    Debit Card|  2022-06-18|Emaar Square Mall|         null|
|        205|   I311151|Female| 39|       Souvenir|       3|  35.19|   Credit Card|  2022-04-27| 

In [9]:
df_json = (spark.read
           .format("json")
           .option("multiLine", "true")           # needed for pretty/indented JSON
           .option("primitivesAsString", "false")
           .option("allowUnquotedFieldNames", "false")
           .option("dropMalformed", "false")      # use 'mode' instead for Spark 3.x
           .option("mode", "PERMISSIVE")          # PERMISSIVE | DROPMALFORMED | FAILFAST
           .option("samplingRatio", "1.0")        # for inference
           .option("pathGlobFilter", "*.json")
           .option("recursiveFileLookup", "true")
           .load(r"D:\GitLocal\big_data\Dataset\flight-data2015.json"))\
           .show()


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

## **File reading Modes:**
1. PERMISSIVE (default): 
    - Loads all valid rows.
    - For malformed rows, Spark puts null in columns and stores the bad record in a special column called _corrupt_record.
    - For JSON, _corrupt_record is often auto-created
    - For CSV, it is not, when schema is provided
2. DROPMALFORMED → drops bad rows
3. FAILFAST → aborts on first bad row

### Supported File Formats for mode:
1. CSV
2. JSON
3. Text

In [10]:
from pyspark.sql.types import *

Data = [
    ("Piyush", 177.50, 30),
    ("Shivani", 166.75, 26),
    ("Akanksha", 150.00, 25)
]
Schema = StructType([
    StructField("Name", StringType(), False),
    StructField("Height", FloatType(), False),
    StructField("Age", IntegerType(), False)
    ])

df = spark.createDataFrame(data = Data, schema = Schema)
df.show()

+--------+------+---+
|    Name|Height|Age|
+--------+------+---+
|  Piyush| 177.5| 30|
| Shivani|166.75| 26|
|Akanksha| 150.0| 25|
+--------+------+---+



### **Numeric Types**

- **ByteType** → 8-bit integer (rarely inferred unless explicitly cast)
- **ShortType** → 16-bit integer
- **IntegerType** → 32-bit integer (common for whole numbers)
- **LongType** → 64-bit integer (used for large integers)
- **FloatType** → 32-bit floating point (less common; Spark usually picks Double)
- **DoubleType** → 64-bit floating point (default for Python `float` and decimal numbers)
- **DecimalType(precision, scale)** → For fixed-point decimals (inferred when reading from formats like Parquet/ORC with schema)

### **String & Binary**
- **StringType** → Text values
- **BinaryType** → Raw binary data (e.g., images, files)

### **Boolean**
- **BooleanType** → `true` / `false`

### **Date & Time**
- **DateType** → `yyyy-MM-dd`
- **TimestampType** → `yyyy-MM-dd HH:mm:ss` or ISO formats

### **Complex Types**
- **ArrayType(elementType)** → Lists or arrays
- **MapType(keyType, valueType)** → Key-value pairs
- **StructType(fields)** → Nested records (JSON objects)


# Reading CSV with Malformed records:

In [60]:
schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), False),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), False),
    StructField("count", IntegerType(), False),
    StructField("_corrupt_record", StringType(), True)
])

df_flight_csv = (spark.read
                    .format("csv")
                    .option("header", "true")                # first row as header
                    .schema(schema)           # let Spark infer types (ok for small files)
                    .option("sep", ",")                      # delimiter
                    .option("mode", "PERMISSIVE")            # PERMISSIVE | DROPMALFORMED | FAILFAST
                    .option("encoding", "UTF-8")
                    .load(r"D:\GitLocal\big_data\Spark\pyspark-masterclass\flightdata-malformed.csv"))\
                    
df_flight_csv.show(truncate=False)      
df_flight_csv.printSchema()                

+------------------------+-------------------+-----+---------------------------------------------+
|DEST_COUNTRY_NAME       |ORIGIN_COUNTRY_NAME|count|_corrupt_record                              |
+------------------------+-------------------+-----+---------------------------------------------+
|United States           |Romania            |15   |null                                         |
|United States           |Croatia            |1    |null                                         |
|United States           |Ireland            |344  |null                                         |
|Egypt                   |United States      |15   |null                                         |
|United States           |India              |62   |null                                         |
|United States           |Singapore          |1    |null                                         |
|United States           |Grenada            |62   |null                                         |
|Costa Ric

In [12]:
from pyspark.sql.functions import *

df_flight_csv.filter(col("_corrupt_record").isNotNull()).show(truncate=False)

+------------------------+-------------------+----+---------------------------------------------+
|DEST_COUNTRY_NAME       |ORIGIN_COUNTRY_NAME|age |_corrupt_record                              |
+------------------------+-------------------+----+---------------------------------------------+
|Senegal                 |United States      |null|Senegal,United States,40.10                  |
|Turks and Caicos Islands|United States      |null|Turks and Caicos Islands,United States,230.20|
|Luxembourg              |United States      |null|Luxembourg,United States,155.30              |
|Hong Kong               |United States      |null|Hong Kong,United States,332.40               |
|United States           |Guatemala          |null|United States,Guatemala,318.50               |
+------------------------+-------------------+----+---------------------------------------------+



##### Filter `df_flight_csv` to retrieve malformed records captured in **PERMISSIVE** mode from the `_corrupt_record` column.

In [62]:
schema2 = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), False),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), False),
    StructField("count", IntegerType(), False)
])

##### **FAILFAST:**
- First malformed record → ❌ job fails immediately

In [63]:
df_flight_csv_FAILFAST = (spark.read
                    .format("csv")
                    .option("header", "true")                # first row as header
                    .schema(schema2)           # let Spark infer types (ok for small files)
                    .option("sep", ",")                      # delimiter
                    .option("mode", "FAILFAST")            # PERMISSIVE | DROPMALFORMED | FAILFAST
                    .option("encoding", "UTF-8")
                    .load(r"D:\GitLocal\big_data\Spark\pyspark-masterclass\flightdata-malformed.csv"))\
                    
df_flight_csv_FAILFAST.show(truncate=False)      
df_flight_csv_FAILFAST.printSchema() 

Py4JJavaError: An error occurred while calling o695.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 58.0 failed 1 times, most recent failure: Lost task 0.0 in stage 58.0 (TID 73) (impetus-bl0672.impetus.co.in executor driver): org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING] Malformed records are detected in record parsing: [Senegal,United States,null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1772)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:74)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:457)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "40.10"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:366)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:308)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:453)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 24 more
Caused by: java.lang.NumberFormatException: For input string: "40.10"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
	at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:190)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:190)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:292)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:190)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:347)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2258)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2298)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3202)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4206)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4206)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3202)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3423)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:283)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:322)
	at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING] Malformed records are detected in record parsing: [Senegal,United States,null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1772)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:74)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:457)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "40.10"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:366)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:308)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:453)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 24 more
Caused by: java.lang.NumberFormatException: For input string: "40.10"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
	at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
	at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:190)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:190)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:292)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:190)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:347)
	... 27 more


##### **DROPMALFORMED:**
- Drop the row that contains marformed records.

In [64]:
df_flight_csv_DROPMALFORMED = (spark.read
                    .format("csv")
                    .option("header", "true")               # first row as header
                    .schema(schema2)                         # let Spark infer types (ok for small files)
                    .option("sep", ",")                     # delimiter
                    .option("mode", "DROPMALFORMED")             # PERMISSIVE | DROPMALFORMED | FAILFAST
                    .option("encoding", "UTF-8")
                    .load(r"D:\GitLocal\big_data\Spark\pyspark-masterclass\flightdata-malformed.csv"))\
                    
df_flight_csv_DROPMALFORMED.show(20, truncate=False)      
df_flight_csv_DROPMALFORMED.printSchema() 

+--------------------------------+-------------------+-----+
|DEST_COUNTRY_NAME               |ORIGIN_COUNTRY_NAME|count|
+--------------------------------+-------------------+-----+
|United States                   |Romania            |15   |
|United States                   |Croatia            |1    |
|United States                   |Ireland            |344  |
|Egypt                           |United States      |15   |
|United States                   |India              |62   |
|United States                   |Singapore          |1    |
|United States                   |Grenada            |62   |
|Costa Rica                      |United States      |588  |
|Moldova                         |United States      |1    |
|United States                   |Sint Maarten       |325  |
|United States                   |Marshall Islands   |39   |
|Guyana                          |United States      |64   |
|Malta                           |United States      |1    |
|Anguilla               

### **Note:** *When reading a CSV file in PySpark with an explicit schema, `_corrupt_record` does NOT appear automatically, even in PERMISSIVE mode. It must be explicitly defined in the schema.*

# Reading JSON with Malformed records:

In [65]:
schema = StructType([
    StructField("ORIGIN_COUNTRY_NAME", StringType(), False),
    StructField("DEST_COUNTRY_NAME", StringType(), False),
    StructField("count", IntegerType(), False)
    ,StructField("_corrupt_record", StringType(), True)

])

df_json = (spark.read
           .format("json")
           .schema(schema)
        #    .option("multiLine", "true")           # needed for pretty/indented JSON
        #    .option("dropMalformed", "false")      # use 'mode' instead for Spark 3.x
           .option("mode", "PERMISSIVE")            ## PERMISSIVE | DROPMALFORMED | FAILFAST
           .load(r"D:\GitLocal\big_data\Spark\pyspark-masterclass\flightdata-malformed.json"))

df_json.show(10, truncate=False)
df_json.printSchema()
df_json.filter(col("_corrupt_record").isNotNull()).show(truncate=False)

+-------------------+-----------------+-----+----------------------------------------------------------------------------------+
|ORIGIN_COUNTRY_NAME|DEST_COUNTRY_NAME|count|_corrupt_record                                                                   |
+-------------------+-----------------+-----+----------------------------------------------------------------------------------+
|Romania            |United States    |15   |null                                                                              |
|Croatia            |United States    |1    |null                                                                              |
|Ireland            |United States    |344  |null                                                                              |
|United States      |Egypt            |15   |null                                                                              |
|India              |United States    |62   |null                                                