# Create DataFrame from different sources

## Create DF from CSV

In [0]:
users_df = spark.read.csv(path="dbfs:/FileStore/synechron/users/csv_format/*.csv",
                          header=True)
users_df.limit(4).display()

id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,Technician,43537


## Create DF from Delimited

In [0]:
users_df = spark.read.csv(path="dbfs:/FileStore/synechron/users/delimited_format/users_001.dat",
                          sep="|",
                          header=True)
users_df.limit(4).display()

id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537


## Create DF from JSON

In [0]:
spark.read.json(
    path="dbfs:/FileStore/synechron/users/json_format/users_001.json"
).limit(4).display()

age,designation,gender,id,salary
0,technician,M,1,85711
53,other,F,2,94043
23,writer,M,3,32067
0,technician,M,4,43537


## Create DF from Parquet

In [0]:
spark.read.parquet(
    "dbfs:/FileStore/synechron/users/parquet_format/users.parquet"
).limit(4).display()

id,age,gender,designation,salary
1,0,M,technician,85711
2,53,F,other,94043
3,23,M,writer,32067
4,0,M,technician,43537


# Exploring DataFrameReader

## Important Parameters

### Common Options for .csv

In [0]:
users_df = spark.read.csv(path="dbfs:/FileStore/synechron/users/delimited_format/users_001.dat",
                          sep="|",
                          header=True,
                          inferSchema=True)
users_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gen: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: integer (nullable = true)



### Common Options for .json

In [0]:
spark.read.json(
    path="dbfs:/FileStore/synechron/users/json_format/users_003.json",
    multiLine=True
).limit(4).display()

age,designation,gender,id,salary
0,technician,M,1,85711
53,other,F,2,94043


## Attach Custom Schema

In [0]:
from pyspark.sql.types import *

USER_SCHEMA = StructType([
  StructField("user_id", IntegerType()),
  StructField("age", IntegerType()),
  StructField("gen", StringType()),
  StructField("designation", StringType()),
  StructField("salary", IntegerType())
])

users_df = spark.read.csv(path="dbfs:/FileStore/synechron/users/delimited_format/users_001.dat",
                          sep="|",
                          header=True,
                          schema=USER_SCHEMA)
users_df.printSchema()                          

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gen: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: integer (nullable = true)



## Handling Bad Record

### PERMISSIVE (Default)

In [0]:
spark.read.option("columnNameOfCorruptRecord", "bad_records").json(
    path="/FileStore/synechron/access_logs.json", mode="PERMISSIVE"
).display()

bad_records,email,first_name,gender,id,ip_address,last_name
,jpenddreth0@census.gov,Jeanette,Female,1.0,26.58.193.2,Penddreth
,gfrediani1@senate.gov,Giavani,Male,2.0,229.179.4.212,Frediani
,nbea2@imageshack.us,Noell,Female,3.0,180.66.162.255,Bea
"{""id"": 4,""first_name"": {,""last_name"": ""Valek"",""email"": ""wvalek3@vk.com"",""gender"": ""Male"",""ip_address"": ""67.76.188.26""}",,,,,,


### DROPMALFORMED

In [0]:
spark.read.json(
    path="/FileStore/synechron/access_logs.json", mode="DROPMALFORMED"
).display()

email,first_name,gender,id,ip_address,last_name
jpenddreth0@census.gov,Jeanette,Female,1,26.58.193.2,Penddreth
gfrediani1@senate.gov,Giavani,Male,2,229.179.4.212,Frediani
nbea2@imageshack.us,Noell,Female,3,180.66.162.255,Bea


### FAILFAST

In [0]:
spark.read.json(path="/FileStore/synechron/access_logs.json", mode="FAILFAST").display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1026188932982138>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mjson[49m[43m([49m[43mpath[49m[38;5;241;43m=[39;49m[38;5;124;43m"[39;49m[38;5;124;43m/FileStore/synechron/access_logs.json[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[43mmode[49m[38;5;241;43m=[39;49m[38;5;124;43m"[39;49m[38;5;124;43mFAILFAST[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241m.[39mdisplay()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38