# Exploring Data Source API

## Creating DF from different sources

### Create DF from CSV Data

In [0]:
user_df = spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    inferSchema=True
)
user_df.limit(4).display()

id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537


In [0]:
user_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gen: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: integer (nullable = true)



### Create DF from custom delimited

In [0]:
df = spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/delimited_format/users_001.dat",
    sep="|",
    header=True,
    inferSchema=True,
)
df.limit(4).display()

id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537


### Create DF from JSON

In [0]:
user_df = spark.read.json(
    path="dbfs:/FileStore/synechron009/dataset/users/json_format/users_001.json"
)

user_df.limit(4).display()

age,designation,gender,id,salary
0,technician,M,1,85711
53,other,F,2,94043
23,writer,M,3,32067
0,technician,M,4,43537


In [0]:
user_df = spark.read.json(
    path="dbfs:/FileStore/synechron009/dataset/users/json_format/users_002.json",
    multiLine=True
)

user_df.limit(4).display()

age,designation,gender,id,salary
0,technician,M,1,85711


In [0]:
user_df = spark.read.json(
    path="dbfs:/FileStore/synechron009/dataset/users/json_format/users_003.json",
    multiLine=True
)

user_df.limit(4).display()

age,designation,gender,id,salary
0,technician,M,1,85711
53,other,F,2,94043


### Create DF from Paraquet

In [0]:
user_df = spark.read.parquet("dbfs:/FileStore/synechron009/dataset/users/parquet_format/users.parquet")
user_df.limit(4).display()

id,age,gender,designation,salary
1,0,M,technician,85711
2,53,F,other,94043
3,23,M,writer,32067
4,0,M,technician,43537


## Custom Schema

In [0]:
from pyspark.sql.types import *

USER_SCHEMA = StructType(
    [
        StructField("id", IntegerType()),
        StructField("age", IntegerType()),
        StructField("gender", IntegerType()),
        StructField("designation", StringType()),
        StructField("salary", IntegerType()),
    ]
)
user_df = spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    schema=USER_SCHEMA,
)
user_df.limit(4).display()

id,age,gender,designation,salary
1,26,,Technician,85711
2,53,,Other,94043
3,23,,Writer,32067
4,26,,technician,43537


## Handling Corrupt/Bad Record

### PERMISSIVE (default)

In [0]:
access_logs = spark.read.option("columnNameOfCorruptRecord", "bad_record").json(
    path="dbfs:/FileStore/synechron009/dataset/access_logs.json", mode="PERMISSIVE"
)
access_logs.display()

bad_record,email,first_name,gender,id,ip_address,last_name
,jpenddreth0@census.gov,Jeanette,Female,1.0,26.58.193.2,Penddreth
,gfrediani1@senate.gov,Giavani,Male,2.0,229.179.4.212,Frediani
,nbea2@imageshack.us,Noell,Female,3.0,180.66.162.255,Bea
"{""id"": 4,""first_name"": {,""last_name"": ""Valek"",""email"": ""wvalek3@vk.com"",""gender"": ""Male"",""ip_address"": ""67.76.188.26""}",,,,,,


### DROPMALFORMED

In [0]:
access_logs = spark.read.json(
    path="dbfs:/FileStore/synechron009/dataset/access_logs.json", mode="DROPMALFORMED"
)
access_logs.display()

email,first_name,gender,id,ip_address,last_name
jpenddreth0@census.gov,Jeanette,Female,1,26.58.193.2,Penddreth
gfrediani1@senate.gov,Giavani,Male,2,229.179.4.212,Frediani
nbea2@imageshack.us,Noell,Female,3,180.66.162.255,Bea


### FAILFAST

In [0]:
access_logs = spark.read.json(
    path="dbfs:/FileStore/synechron009/dataset/access_logs.json", mode="FAILFAST"
)
access_logs.display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3004113653720669>:1[0m
[0;32m----> 1[0m access_logs [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mjson(
[1;32m      2[0m     path[38;5;241m=[39m[38;5;124m"[39m[38;5;124mdbfs:/FileStore/synechron009/dataset/access_logs.json[39m[38;5;124m"[39m, mode[38;5;241m=[39m[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m
[1;32m      3[0m )
[1;32m      4[0m access_logs[38;5;241m.[39mdisplay()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs

#Exploring DataFrameWriter API

## Writing DF as JSON

In [0]:
spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    inferSchema=True,
).write.mode("overwrite").format("json").save(
    "dbfs:/FileStore/synechron009/output/json"
)

In [0]:
spark.read.json(path="dbfs:/FileStore/synechron009/output/json").limit(4).display()

age,designation,gen,id,salary
26,Technician,M,1,85711
53,Other,F,2,94043
23,Writer,M,3,32067
26,technician,M,4,43537


## Modes

### Overwrite

In [0]:
spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    inferSchema=True,
).write.mode("ovewrite").format("json").save(
    "dbfs:/FileStore/synechron009/output/json"
)

### Append

In [0]:
spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    inferSchema=True,
).write.mode("append").format("json").save(
    "dbfs:/FileStore/synechron009/output/json"
)

### Error If Exists

In [0]:
spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    inferSchema=True,
).write.format("json").save(
    "dbfs:/FileStore/synechron009/output/json"
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3004113653720678>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mcsv[49m[43m([49m
[1;32m      2[0m [43m    [49m[43mpath[49m[38;5;241;43m=[39;49m[38;5;124;43m"[39;49m[38;5;124;43mdbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv[39;49m[38;5;124;43m"[39;49m[43m,[49m
[1;32m      3[0m [43m    [49m[43mheader[49m[38;5;241;43m=[39;49m[38;5;28;43;01mTrue[39;49;00m[43m,[49m
[1;32m      4[0m [43m    [49m[43minferSchema[49m[38;5;241;43m=[39;49m[38;5;28;43;01mTrue[39;49;00m[43m,[49m
[1;32m      5[0m [43m)[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mjson[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;24

# DataFrame Operations

## Basic Operations

In [0]:
user_df = spark.read.csv(
    path="dbfs:/FileStore/synechron009/dataset/users/csv_format/users_001.csv",
    header=True,
    inferSchema=True
)
user_df.select("id","age","gen").limit(3).display()

id,age,gen
1,26,M
2,53,F
3,23,M


## col function

In [0]:
from pyspark.sql.functions import col, concat, lit
user_df.select(concat(col("id"),lit("-"),col("gen"))).display()

"concat(id, -, gen)"
1-M
2-F
3-M
4-M
5-F
6-M
7-M
8-M
9-M
10-M


In [0]:
user_df.filter(col("gen")=="M").limit(4).display()

id,age,gen,designation,salary
1,26,M,Technician,85711
3,23,M,Writer,32067
4,26,M,technician,43537
6,42,M,Cheif Executive Officer,98101


In [0]:
user_df.display()

id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537
5,33,F,Other,15213
6,42,M,Cheif Executive Officer,98101
7,57,M,Administrator,91344
8,36,M,Administrator,5201
9,29,M,Student,1002
10,53,M,Lawyer,90703


In [0]:
user_df.filter(col("designation").isin("Other", "Student")).limit(14).display()

id,age,gen,designation,salary
2,53,F,Other,94043
5,33,F,Other,15213
9,29,M,Student,1002
11,39,F,Other,30329
12,28,F,Other,6405
18,35,F,Other,37212
30,7,M,Student,55436
32,28,F,Student,78741
33,23,M,Student,27510
36,19,F,Student,93117


In [0]:
from pyspark.sql.functions import lower
user_df.filter(lower(col("designation"))=="technician").display()

id,age,gen,designation,salary
1,26,M,Technician,85711
4,26,M,technician,43537
44,26,M,Technician,46260
77,30,M,Technician,29379
143,42,M,Technician,8832
197,55,M,Technician,75094
244,28,M,Technician,100005
245,26,F,Technician,110000


In [0]:
words = ["Te","Ot"]
check = None
for word in words:
    if check is None:
        check = col("designation").contains(word)
    else:
        check = check |  col("designation").contains(word)

user_df = user_df.filter(check).limit(14)
user_df.display()

id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
5,33,F,Other,15213
11,39,F,Other,30329
12,28,F,Other,6405
18,35,F,Other,37212
38,28,F,Other,54467
44,26,M,Technician,46260
77,30,M,Technician,29379
83,40,M,Other,44133


## Grouping, Aggregation and Sorting

In [0]:
user_df.groupBy(col("designation")).count().filter(col("count")>10).display()


designation,count
Student,39
Programmer,20
Other,20
Cheif Executive Officer,12
Librarian,14
Artist,11
Engineer,16
Administrator,27
Educator,25


In [0]:
from pyspark.sql.functions import count, max, min
user_df.groupBy(col("designation")).agg(count("*").alias("count"),
                                        max("age").alias("age"),
                                        min("age")).display()

designation,count,age,min(age)
Technician,3,30,26
Other,11,53,24


# Handling Missing Records