In [21]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

# Actors

In [22]:
# filepath = "dbfs:/FileStore/tables/Files/actors.csv"
filepath = "actors.csv"
df = spark.read.format("csv") \
    .option("inferSchema", "True") \
    .option("header", "True") \
    .load(filepath)

display(df)

                                                                                

DataFrame[imdb_title_id: string, ordering: int, imdb_name_id: string, category: string, job: string, characters: string]

**See current schema**

In [23]:
df.printSchema()

root
 |-- imdb_title_id: string (nullable = true)
 |-- ordering: integer (nullable = true)
 |-- imdb_name_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



**Create new schema**

In [24]:
schema = "title STRING, order INT, name STRING, category STRING, job STRING, characters STRING"

**Read DataFrame with new schema**

In [25]:
df = spark.read.format("csv") \
    .schema(schema) \
    .option("header", "True") \
    .load(filepath)

display(df)

DataFrame[title: string, order: int, name: string, category: string, job: string, characters: string]

#### Ex. 3
**Create json by hand**

In [34]:
json = '{"title": "tt0000009","order": 1,"name": "nm0063086","category": "actress","job": "null","characters": "[Miss Geraldine Holbrook (Miss Jerry)]"}'
df_json = spark.read.json(sc.parallelize([json]))

In [35]:
df_json.show()

+--------+--------------------+----+---------+-----+---------+
|category|          characters| job|     name|order|    title|
+--------+--------------------+----+---------+-----+---------+
| actress|[Miss Geraldine H...|null|nm0063086|    1|tt0000009|
+--------+--------------------+----+---------+-----+---------+



**or write DataFrame to json and read again**

In [36]:
# df.select("title", "name").write.save("title_name.json", format="json")

In [37]:
# df_json = spark.read.format("json").json("title_name.json")

# df_json.show()


#### Ex. 4
**Damaging the data**

In [52]:
json = '"title": "tt0000009","order" 1,"name": "nm0063086","category": "actress" "job": "null","characters": "[Miss Geraldine Holbrook (Miss Jerry)]"}'

In [53]:
df_1 = spark.read.schema(schema) \
    .option("header", "True") \
    .option("badRecordsPath", "/mnt/sources/badrecords") \
    .json(sc.parallelize([json]))

df_1.show()

+-----+-----+----+--------+----+----------+
|title|order|name|category| job|characters|
+-----+-----+----+--------+----+----------+
| null| null|null|    null|null|      null|
+-----+-----+----+--------+----+----------+



In [54]:
df_2 = spark.read.schema(schema) \
    .option("header", "True") \
    .option("mode", "PERMISSIVE") \
    .json(sc.parallelize([json]))

df_2.show()

+-----+-----+----+--------+----+----------+
|title|order|name|category| job|characters|
+-----+-----+----+--------+----+----------+
| null| null|null|    null|null|      null|
+-----+-----+----+--------+----+----------+



In [55]:
df_3 = spark.read.schema(schema) \
    .option("header", "True") \
    .option("mode", "DROPMALFORMED") \
    .json(sc.parallelize([json]))

df_3.show()

+-----+-----+----+--------+---+----------+
|title|order|name|category|job|characters|
+-----+-----+----+--------+---+----------+
+-----+-----+----+--------+---+----------+



In [56]:
df_4 = spark.read.schema(schema) \
    .option("header", "True") \
    .option("mode", "FAILFAST") \
    .json(sc.parallelize([json]))

df_4.show()

23/03/19 17:37:58 ERROR Executor: Exception in task 0.0 in stage 21.0 (TID 21)
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$12(DataFrameReader.scala:430)
	at org.apache.spark.sql.DataFrameReader$$Lambda$3357/1880042285.apply(Unknown Source)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedR

Py4JJavaError: An error occurred while calling o577.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 21) (192.168.0.171 executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$12(DataFrameReader.scala:430)
	at org.apache.spark.sql.DataFrameReader$$Lambda$3357/1880042285.apply(Unknown Source)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2523/1015148357.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.RDD$$Lambda$2520/455397224.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2482/1590036297.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Failed to parse field name null, field value title, [VALUE_STRING] to target spark data type [StructType(StructField(title,StringType,true),StructField(order,IntegerType,true),StructField(name,StringType,true),StructField(category,StringType,true),StructField(job,StringType,true),StructField(characters,StringType,true))].
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:513)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$10(DataFrameReader.scala:426)
	at org.apache.spark.sql.DataFrameReader$$Lambda$3356/1771540971.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 24 more
Caused by: java.lang.RuntimeException: Failed to parse field name null, field value title, [VALUE_STRING] to target spark data type [StructType(StructField(title,StringType,true),StructField(order,IntegerType,true),StructField(name,StringType,true),StructField(category,StringType,true),StructField(job,StringType,true),StructField(characters,StringType,true))].
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failToParseValueForDataTypeError(QueryExecutionErrors.scala:1159)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$failedConversion$1.applyOrElse(JacksonParser.scala:406)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$failedConversion$1.applyOrElse(JacksonParser.scala:389)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:101)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:101)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:377)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:101)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$Lambda$3105/1482905720.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:501)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$Lambda$3115/284009358.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:496)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$3185/1552265644.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$3183/1085910077.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset$$Lambda$1581/547834026.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.Dataset$$Lambda$1915/1535253222.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset$$Lambda$1582/2128262915.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1593/16354643.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1583/1813565250.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$12(DataFrameReader.scala:430)
	at org.apache.spark.sql.DataFrameReader$$Lambda$3357/1880042285.apply(Unknown Source)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2523/1015148357.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.RDD$$Lambda$2520/455397224.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2482/1590036297.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Failed to parse field name null, field value title, [VALUE_STRING] to target spark data type [StructType(StructField(title,StringType,true),StructField(order,IntegerType,true),StructField(name,StringType,true),StructField(category,StringType,true),StructField(job,StringType,true),StructField(characters,StringType,true))].
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:513)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$10(DataFrameReader.scala:426)
	at org.apache.spark.sql.DataFrameReader$$Lambda$3356/1771540971.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 24 more
Caused by: java.lang.RuntimeException: Failed to parse field name null, field value title, [VALUE_STRING] to target spark data type [StructType(StructField(title,StringType,true),StructField(order,IntegerType,true),StructField(name,StringType,true),StructField(category,StringType,true),StructField(job,StringType,true),StructField(characters,StringType,true))].
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failToParseValueForDataTypeError(QueryExecutionErrors.scala:1159)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$failedConversion$1.applyOrElse(JacksonParser.scala:406)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$failedConversion$1.applyOrElse(JacksonParser.scala:389)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:101)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:101)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:377)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:101)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$Lambda$3105/1482905720.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:501)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$Lambda$3115/284009358.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2764)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:496)
	... 27 more


In [57]:
spark.stop()