### READING three different types of files

In [0]:

customers_df = spark.read.option("inferSchema",True).option("header",True).csv("/Volumes/workspace/default/filestore/customers.csv")
# Load Parquet
products_df = spark.read.parquet("/Volumes/workspace/default/filestore/products.parquet")

# Load JSON
sales_df = spark.read.json("/Volumes/workspace/default/filestore/sales.json")
sales_df.display()
from pyspark.sql.functions import col

duplicate_row = customers_df.filter(customers_df.customer_id == 'C001')

customers_with_duplicate = customers_df.union(duplicate_row)

#display(customers_with_duplicate)

# Remove duplicates and nulls (simple cleaning)
customers_df = customers_with_duplicate.dropDuplicates(["customer_id"]).na.drop()
sales_df = sales_df.dropDuplicates(["sale_id"]).na.drop()
products_df = products_df.dropDuplicates(["product_id"]).na.drop()

#display(customers_df)

# Join all three datasets
sales_joined = sales_df \
    .join(customers_df, "customer_id", "left") \
    .join(products_df, "product_id", "left")

# Add total_amount column
sales_final = sales_joined.withColumn("total_amount", col("quantity") * col("price"))

sales_final.display()

#sales_final.explain(True)

#display(sales_final)

#from pyspark.sql.functions import sum as _sum

# Revenue per category
#category_revenue = sales_final.groupBy("category").agg(_sum("total_amount").alias("total_revenue"))

#display(category_revenue)

#sales_final.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/FileStore/tables/delta/sales_final")
#category_revenue.write.format("delta").mode("overwrite").save("/Volumes/workspace/default/FileStore/tables/delta/category_revenue")

#spark.read.format("delta").load("/Volumes/workspace/default/FileStore/tables/delta/category_revenue").createOrReplaceTempView("category_revenue")


#result=spark.sql("SELECT * FROM category_revenue Where total_revenue > 1000 ORDER BY total_revenue DESC")
#display(result)






In [0]:
df_json=spark.read.format('json').option("inferSchema",True).option("header",True).option("multiline",True).load("/Volumes/workspace/default/filestore/sales.json")

In [0]:
df_json.display()

### Schema Defination

In [0]:
df_json.printSchema()

### DDL Schema

In [0]:
my_ddl_schema = ''' 
customer_id string,
product_id string,
quantity double,
sale_id long
'''

In [0]:
df=spark.read.format('json').schema(my_ddl_schema).option("inferSchema",True).option("header",True).option("multiline",True).load("/Volumes/workspace/default/filestore/sales.json")


In [0]:
df.display()

In [0]:
df.show()
df.printSchema()

In [0]:
new_schema='''
customer_id string,
product_id string,
quantity double,
sale_id double
'''

In [0]:
cust_df=spark.read.format('json').schema(new_schema).option("inferSchema",True).option("header",True).option("multiline",True).load("/Volumes/workspace/default/filestore/sales.json")

In [0]:
cust_df.display()
cust_df.printSchema()

### Select

In [0]:
cust_df.select('customer_id','product_id','quantity').display()

In [0]:
new_df = cust_df.selectExpr('customer_id as cust_id', 'product_id', 'quantity')
display(new_df)
display(cust_df) #at the dataframe level

In [0]:
new_df.filter((new_df.cust_id == 'C001') | (new_df.product_id == 'P02')).display()

In [0]:
new_df.filter((new_df.cust_id == 'C001') & (new_df.product_id == 'P01')).display()

In [0]:
new_df.filter(new_df.cust_id.isin('C001','C002')).display()


In [0]:
new_df.withColumnRenamed('cust_id','customer_id').display()

### withColumn

In [0]:
from pyspark.sql.functions import *
new_df.withColumn("order_id", concat(col("cust_id"), lit('-'), col("product_id"))).display()

In [0]:
new_df.withColumn("order_id", expr("regexp_replace(cust_id, 'C', 'D')"))\
.withColumn("cust_id",expr("regexp_replace(cust_id,'C','D')")).display()


### Type casting

In [0]:
new_df=new_df.withColumn("quantity",col("quantity").cast("long"))
new_df.printSchema()

In [0]:
new_df.sort(col("quantity").desc()).display()

In [0]:
new_df.sort([col("quantity").asc(),col("cust_id").desc()]).display()
new_df.sort(["cust_id","quantity"],ascending=[0,1]).display()

In [0]:
# new_df.drop('cust_id').display()
# new_df.display()
# spark.sql("insert into new_df values('C001','P01',100,1)")
# new_df.display()

# new_df.createOrReplaceTempView("new_df")
# display(spark.sql("select * from new_df"))
# spark.sql("insert into new_df values('C001','P01',100,1)")
# display(spark.sql("select * from new_df"))

# Write your DataFrame as a real table
df.write.mode("overwrite").saveAsTable("default.new_df")

# Now you can run SQL INSERT statements
spark.sql("INSERT INTO default.new_df VALUES('C001','P01',100,1)")

display(spark.sql("SELECT * FROM default.new_df"))
df.distinct().display()





In [0]:
display(df)

In [0]:

data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)
display(df1)
data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)
display(df2)
display(df1.union(df2))

In [0]:
data1 = [('kad','1'),
        ('sid','2')]
schema1 = 'name STRING, id STRING' 
df1 = spark.createDataFrame(data1,schema1)
display(df1)

data2 = [('3','rahul'),
        ('4','jas'),('','')]
schema2 = 'id STRING, name STRING' 
df2 = spark.createDataFrame(data2,schema2)
df2.dropna("all").display()
display(df2)
df1.drop_duplicates().display()
display(df1.union(df2))
df1.union(df2).display()
display(df1.unionByName(df2))
df1.unionByName(df2).display()

In [0]:
df.select(initcap("customer_id")).display()
df.withColumn("customer_id",initcap("customer_id")).display()
df.display()

In [0]:
df.dropna('all').display()

In [0]:
df.display()
new_df.display()