# Creating DataFrameReader

In [0]:
type(spark)


In [0]:
dfr = spark.read
print(type(dfr))

# Create DataFrame from different sources

## Create DataFrame from CSV

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True
)
df.printSchema()
df.display()

## Create DataFrane from delimiter file

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.dat",
    header=True,
    inferSchema=True,
    sep="|"
)
df.printSchema()
df.display()

## Create DataFrame from JSON

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_004.json",
    multiLine=True,
).display()

# Custom Schema

In [0]:
from pyspark.sql.types import *

USER_SCHEMA = StructType(
    [
        StructField("id", IntegerType()),
        StructField("age", IntegerType()),
        StructField("gen", StringType()),
        StructField("designation", StringType()),
        StructField("salary", IntegerType()),
    ]
)
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    schema=USER_SCHEMA
)
df.printSchema()
df.display()

# Handling Bad Records
- Permissive
- DropMalformed
- FailFast

## permissive

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="PERMISSIVE"
).display()

## DropMalformed


In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="dropMalformed"
).display()

## FailFast


In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="FAILFAST"
).display()

# DataFrame Writer API

## Convert CSV into JSON


In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
#print(type(df.write))
#df.write.save(
#   format="JSON",
#  path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
#)

spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json"
).display()

# Output modes

## Overwrite

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="overwrite",
)

## Ignore

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="ignore",
)

## errorifexists

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="errorifexists",
)

## append

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)

df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="append",
)