In [0]:
from pyspark.sql.functions import *

In [0]:
dff = spark.read.csv("/Volumes/workspace/default/batch12-volume/sample data.csv", header=True, inferSchema=True)
display(dff)

In [0]:
df = spark.read.csv("/Volumes/workspace/default/batch12-volume/null.txt", header=True, inferSchema=True)
display(df)

# How to play with Null values

In [0]:
df2 = df.na.drop() #any
display(df2)
df3 = df.na.drop("any")
display(df3)
df4 = df.na.drop("all")
display(df4)
df5 = df.na.drop(subset=["id","city"])
display(df5)
df6 =df.dropna()
display(df6)

In [0]:
# to fill the null value
df7 = df.fillna("Not Avilable")
display(df7)
df8 = df.na.fill("Not Avilable")
display(df8)
data = {"id":0, "name":"NA", "city":"Unknown"}
df9 = df.na.fill(data)
display(df9)

# How to play with JSON

In [0]:
df_json = spark.read.json("/Volumes/workspace/default/batch12-volume/Order_JSON.json")
display(df_json)

In [0]:
df_j = spark.read.json("/Volumes/workspace/default/batch12-volume/saleslt.customer.json")
display(df_j)

# How to play with parquet file

In [0]:
df_par = spark.read.parquet("/Volumes/workspace/default/batch12-volume/SalesLT.Customer.parquet")
display(df_par)

# How fix without comma separators in CSV

In [0]:
df_bad = spark.read.csv("/Volumes/workspace/default/batch12-volume/sales_orders_bad_data.csv", header=True, inferSchema=True)
display(df_bad)

df10 = spark.read.csv("/Volumes/workspace/default/batch12-volume/sales_orders_bad_data.csv", header=True, inferSchema=True, sep= '|')
display(df10)

df11 = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .csv("/Volumes/workspace/default/batch12-volume/sales_orders_bad_data.csv")

display(df11)


# Hot to handle the multiline CSV
 multiLine=True

In [0]:
df_mul = spark.read.csv("/Volumes/workspace/default/batch12-volume/Multiline_example.csv",header=True,inferSchema=True)
display(df_mul)

df12 = spark.read.csv("/Volumes/workspace/default/batch12-volume/Multiline_example.csv",header=True,inferSchema=True, multiLine=True)
display(df12)

multiline_Schema = "id int, name string, description string"
df13 = spark.read.load("/Volumes/workspace/default/batch12-volume/Multiline_example.csv",header=True,format='csv',schema=multiline_Schema, multiLine=True)
display(df13)


# Hot to handle the multiline JSON

In [0]:
# df14 = spark.read.json("/Volumes/workspace/default/batch12-volume/Multiline_example.json")
# display(df14)
df14 = spark.read.json("/Volumes/workspace/default/batch12-volume/Multiline_example.json", multiLine=True)
display(df14)

# How to play with Nested JSON


In [0]:
df15  = spark.read.json("/Volumes/workspace/default/batch12-volume/SampleNested_JASON.json")
display(df15)

df16 = df15.select("age","email","name","address.city","address.state","address.street","address.zip")
display(df16)

In [0]:
df17 = spark.read.json("/Volumes/workspace/default/batch12-volume/Array_Explode_Example.json", multiLine=True)
display(df17)
df18 = df17.select("customer_id",explode("product_ids").alias("product_ids"))
display(df18)
df19 = df17.select("customer_id",col("product_ids")[0],col("product_ids")[1]) #using the index to get the result 
display(df19)
df20 = df18.groupBy("customer_id").agg(collect_list("product_ids").alias("product_ids")) #to move it back whic are exploded
display(df20)

# How to play with duplicated rows using distint and dropduplicate

In [0]:
df17 = spark.read.csv("/Volumes/workspace/default/batch12-volume/duplicate.txt",header=True,inferSchema=True)
display(df17)
df18 = df17.distinct()
display(df18)
df19 = df17.dropDuplicates()
display(df19)
df20 = df17.dropDuplicates(subset=["city","id"])
display(df20)

# How to play with strings Data

In [0]:
df21 = spark.read.csv("/Volumes/workspace/default/batch12-volume/Order_Excel.csv",header=True,inferSchema=True)
display(df21)


#Inbuild functions of string 

In [0]:
from pyspark.sql.functions import *

df22 = df21.select(lower("Region"),upper("Region"),initcap("Region"),ltrim("Region"),rtrim("Region"),trim("Region"),length("Region"),translate("Region","A","B"),concat_ws(" | | ","Region","Country"),substring("Region",1,2),substring_index("Region"," ",2),split("Region"," "),split("Region"," "))
display(df22)

#How to play with wrong date data

In [0]:
df23 = spark.read.csv("/Volumes/workspace/default/batch12-volume/dates-batch11_wrong_date.txt",header=True,inferSchema=True)
display(df23)
df24 = df23.withColumn("OderDate", try_to_date(col("reg_date"),"dd/MM/yy"))
display(df24)

In [0]:
df24.printSchema()

In [0]:
#To print the list of column names
cloName = df24.columns
print(cloName)
print(type(cloName))
result = df24.collect()
print(result)
result = df24.collect()
print(result[0] [1])

In [0]:
df25 = df24.limit(2)
display(df25)
df26 = df24.first()
display(df26)

#Writing the dataframe
1) overwrite
2) append
3) ignore
4) error

In [0]:
df24.write.mode("overwrite").format("json").option("path","/Volumes/workspace/default/batch12-volume/Output/dates-batch12_correct_date_example").save()


In [0]:
df27 = spark.read.json("/Volumes/workspace/default/batch12-volume/Output/dates-batch12_correct_date_example/part-00000-tid-8067871907071426167-98453d98-2599-4d83-b793-22ac62e8db28-364-1-c000.json")
display(df27)

In [0]:
df24.write.mode("append").format("parquet").option("path","/Volumes/workspace/default/batch12-volume/Output/dates-batch12_correct_date_example").save()

In [0]:
df24.write.mode("ignore").format("parquet").option("path","/Volumes/workspace/default/batch12-volume/Output/dates-batch12_correct_date_example").save()

In [0]:
df24.write.mode("error").format("parquet").option("path","/Volumes/workspace/default/batch12-volume/Output/dates-batch12_correct_date_example").save()

# Partition

In [0]:
df28 = df24.repartition(4)
df28.rdd.getNumPartitions()