
# Introduction

1. The primary interface to create Dataframe is via DataFrameReader

**Create DataFrameReader**

`
dfr = spark.read
print(type(dfr))
`


https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession

* You have several methods to create DataFrame in DataFrameReader
(.csv, .json,   .parquet,   .orc,   .jdbc

2. You have several methosd in DataFrameReader to create DataFrame 

https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader



# Create DataFrame from Different source

In [0]:
type(spark)

In [0]:
dfr = spark.read
type(dfr)

## 01. Create DF from CSV

In [0]:
employee_df = spark.read.csv("/FileStore/tables/employee.csv")
print(type(employee_df))

## 02. Create DF from JSON

In [0]:
employee_df = spark.read.json("/FileStore/tables/users_01.json")
print(type(employee_df))

## 03. Create DF from Database

In [0]:
spark.read.jdbc

# Exploring DataFrameReader


**Reference**

* https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

## Common options for  .csv

* header :
* inferSchema :
* sep :
* schema :
* quote :

In [0]:
employee_df = spark.read.csv(
    path="/FileStore/tables/employee.csv", sep="|", header=True, quote="'",inferSchema=True
)
employee_df.show(n=5, truncate=False)

+------+------------+-------+-------+----------+-----------+---------+----------+--------------------+--------------------------+
|col_id|col_name    |col_exp|col_gen|col_dob   |col_company|col_desig|col_doj   |col_skills          |col_actual_expected_salary|
+------+------------+-------+-------+----------+-----------+---------+----------+--------------------+--------------------------+
|101   |Agastya     |1      |M      |1987-01-22|Infosys    |Developer|2015-01-21|Hadoop,PySpark,Kafka|1050000,1100000           |
|102   |Acyuta      |2      |F      |1987-03-29|TCS        |Team Lead|2016-01-21|C,C++,Java          |1050000,1100000           |
|103   |Anuvrata    |1      |M      |1987-01-22|Infosys    |Developer|2017-01-21|Java,Python,Hadoop  |1050000,1100000           |
|null  |Bhavika     |6      |F      |1987-01-22|Cisco      |Team Lead|2015-01-21|Hadoop,PySpark,Kafka|null                      |
|105   |Chitragandha|null   |M      |1987-01-22|CTS        |Developer|null      |C,C++,Jav

In [0]:
employee_df.printSchema()

root
 |-- col_id: integer (nullable = true)
 |-- col_name: string (nullable = true)
 |-- col_exp: integer (nullable = true)
 |-- col_gen: string (nullable = true)
 |-- col_dob: date (nullable = true)
 |-- col_company: string (nullable = true)
 |-- col_desig: string (nullable = true)
 |-- col_doj: date (nullable = true)
 |-- col_skills: string (nullable = true)
 |-- col_actual_expected_salary: string (nullable = true)



In [0]:
users_df = spark.read.csv(
    path="/FileStore/tables/users_001.csv", header=True, quote="'",inferSchema=True
)
users_df.show(n=5, truncate=False)

## Attaching Custom Schema

In [0]:
from pyspark.sql.types import *

USER_SCHEMA = StructType([
  StructField("id", IntegerType()),
  StructField("full_name", StringType()),
  StructField("email", StringType()),
  StructField("gender", StringType()),
  StructField("country", StringType()),
  StructField("state", StringType()),
  StructField("city", StringType()),
  StructField("asset", IntegerType()),
  StructField("marital_status", StringType())
])

users_df = spark.read.csv(
    path="/FileStore/tables/users_001.csv", 
    header=True, 
    quote="'",
    schema=USER_SCHEMA
)


In [0]:
users_df.printSchema()

In [0]:
users_df = spark.read.csv(
    path="/FileStore/tables/users_001.csv", header=True, quote="'",inferSchema=True
)


In [0]:
from pyspark.sql.types import *

USER_SCHEMA = StructType([
  StructField("id", IntegerType()),
  StructField("full_name", StringType()),
  StructField("email", StringType()),
  StructField("gender", StringType()),
  StructField("country", StringType()),
  StructField("state", StringType()),
  StructField("city", StringType()),
  StructField("asset", IntegerType()),
  StructField("marital_status", StringType())
])

users_df = spark.read.csv(
    path="/FileStore/tables/users_001.csv", 
    header=True, 
    quote="'",
    schema=USER_SCHEMA
)


## Handling Corrupt Records

There are 3 modes

1. PERMISSIVE
2. DROPMALFORMED
3. FAILFASE

### PERMISSIVE

In [0]:
access_logs_df = spark.read.option("mode","PERMISSIVE").json(path="/FileStore/tables/access_logs.json")
access_logs_df.show()

In [0]:
access_logs_df = (
    spark.read.option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "rejected_records")
    .json(path="/FileStore/tables/access_logs.json")
)
access_logs_df.show()



### DROPMALFORMED

In [0]:
access_logs_df = spark.read.option("mode","DROPMALFORMED").json(path="/FileStore/tables/access_logs.json")
access_logs_df.show()

### FAILFAST

In [0]:
access_logs_df = spark.read.option("mode","FAILFAST").json(path="/FileStore/tables/access_logs.json")
access_logs_df.show()

# Exploring DataFrameWriter

**Referenec**

https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession

## Write as parquet

In [0]:
v.printSchema()

In [0]:
users_df.select("full_name").write.format("csv").option("header", "true").save(
    "/FileStore/tables/output_01"
)

In [0]:
spark.read.csv(path="/FileStore/tables/output_01", header=True).show(n=5)

In [0]:
dbutils.fs.ls("/FileStore/tables/output_parquet_02")

In [0]:
users_df.select("full_name").write.option("mode","overwrite").option("header", "true").parquet(
    "/FileStore/tables/output_parquet_02"
)

In [0]:
spark.read.format("delta").load("/FileStore/tables/output_parquet_02").show(n=5)


## Write as JSON

In [0]:
users_df.select("full_name").write.json(
    "/FileStore/tables/output_json"
)
spark.read.json("/FileStore/tables/output_json").show()

In [0]:
spark.read.text("/FileStore/tables/output_json").show(truncate=False)

## Output Modes

There are three output modes

1. overwrite
2. append

### Overwrite

### Append