In [0]:
#tables = orders , customers

In [0]:
df = spark.readStream.format('cloudFiles')\
            .option('cloudFiles.format','json')\
            .option('cloudFiles.schemaLocation','dbfs:/FileStore/tables/orders/schema')\
            .option('header','true')\
            .option('multiline','true')\
            .option('escape','"')\
            .option('quote','"')\
            .schema(json_schema)\
            .load('/FileStore/tables/orders/orders')

display(df.limit(5))



#droping col name dump where there is json dump which is not relavant 
df.drop('dump')

#primary key is cust_id and should not be null
df.filter(df.cust_id.isNotNull())

#ingestion timing 
df.withColumn('current_ts',current_timestamp())

#add file path name 
df.withColumn('file_name' , '_metadata.file_path')


#write to delta
df.writeStream.format('delta')\
      .partitionBy('order_date')\
      .outputMode('append')\
      .option('checkpointLocation','dbfs:/FileStore/tables/orders/checkpoint')\
      .table('orders')

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *

In [0]:
#read from bronze delta table
df = spark.read.table('bronze.orders')

#type casting
df.withColumn('order_id' ,col('order_id').cast('int'))

#rename column
df.withColumnRenamed('timestamp_ts' , 'current_timestamp_new')

# Filtering invalid records (business rules)
df.filter(col('age') > 18, col('amount') >= 0)


#Deduplication (window or dropDuplicates)
df.dropDuplicates(['order_id'])


#Flatten nested structures (if JSON assumed)
df.withColumn('nested_col',
    from_json(col('nested_struct'), nested_struct_schema)
)
df.withColumn('order_id' , col('nested_col.order_id'))\
    .withColumn('order_qty ', col('nested_col.order_qty'))




#join , using inner to know only matching and not used broadcast since both are big tables and not fit in a single memory amd it will overwhelm
spark.sql(
    '''
    select o.order_id , c.customer_id , o.order_qty as order_qty
    from orders o
    inner join customers c on o.customer_id = c.customer_id
    group by c.customer_id, o.order_id 
    order by order_qty desc limit 5

    '''
)


# write to silver  , append because i want to upsert new data to it , rather than over writng and save computation cost and no partition since i dont know exactly ehat is high cardinality or most used without it it will be mess so havent partioned

df.write.format('delta')\
    .mode('append')\
    .save('/FileStore/tables/silver/orders')



In [0]:
'''

GOLD LAYER â€” Analytics-ready (SQL)
Here you switch to Spark SQL ðŸ‘Œ
This is a good signal in interviews.
What I want you to write and share:
1. Create Gold table / view using SQL
Include:
Aggregations (GROUP BY)
Joins
Business metrics
Examples:
total_orders per customer
total_revenue per day
top customers by spend
running totals or rank (window function)
2. Explain why SQL here
Fail-fast
Easier debugging
Analytics consumption
ðŸ‘‰ This proves you understand layer intent, not just syntax.

'''

In [0]:
%sql
 -- Aggregations (GROUP BY)

select order_id, count(*) as total_orders
from orders
group by order_id;
 

--Joins

select o.order_id , c.customer_id , o.order_qty as order_qty
from orders o
inner join customers c on o.customer_id = c.customer_id
group by c.customer_id, o.order_id 
order by order_qty desc limit 5


--Business metrics
--total_orders per customer

select c.customer_id, count(o.order_id) as total_orders
from orders o
inner join customers c on o.customer_id = c.customer_id
group by c.customer_id
order by total_orders desc

--#* total_revenue per day
select o.order_date , sum(o.order_qty * o.unit_price) as revenue 
from orders o
inner join customers c on o.customer_id = c.customer_id
group by o.order_date
order by revenue desc

--#* top customers by spend
select c.customer_id, sum(o.order_qty * o.unit_price) as total_spend
from orders o
inner join customers c on o.customer_id = c.customer_id
group by c.customer_id
order by total_spend desc
limit 10


--#* running totals

select customer_id , order_date , amount , sum(amount)
over (partition by customer_id order by amount rows between unbounded preceding and current row) as cum_sum
from master_table