In [0]:
%sql
select current_metastore()

In [0]:
%sql
use catalog newpavancatalog;
use schema bronze;

In [0]:
%sql
CREATE EXTERNAL LOCATION IF NOT EXISTS pavannewexternallocation
    URL 'abfss://landing@pavansa28.dfs.core.windows.net/'
    WITH (STORAGE CREDENTIAL pavannewaccessconnctor)
    COMMENT 'External Location for yellow taxi nyc Purposes'

In [0]:
%sql
show external locations;

In [0]:
%sql
DESCRIBE EXTERNAL LOCATION pavannewexternallocation;

In [0]:
%fs ls 'abfss://landing@parkaru15sa.dfs.core.windows.net/'

In [0]:
%sql 
SHOW STORAGE CREDENTIALS;

In [0]:
%fs ls 'abfss://landing@parkaru15sa.dfs.core.windows.net/yellowtaxi/YellowTaxiTripData.csv'

In [0]:
dfyellowTaxi = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("cloudFiles.schemaLocation", "abfss://landing@parkaru15sa.dfs.core.windows.net/yellowtaxi/_schema") \
    .load('abfss://landing@parkaru15sa.dfs.core.windows.net/yellowtaxi/')

display(dfyellowTaxi)

In [0]:
# write stream to bronze 
dfyellowTaxi.writeStream \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", "abfss://landing@parkaru15sa.dfs.core.windows.net/yellowtaxi/_bronze_checkpoint") \
    .toTable("newpavancatalog.bronze.bronze_yellowTaxi")

In [0]:
%sql
select count(*) from newpavancatalog.bronze.bronze_yellowTaxi;
    


In [0]:
# use bronze table and display
bronze_yellowTaxi = spark.read.table("newpavancatalog.bronze.bronze_yellowTaxi")
display(bronze_yellowTaxi)
display(bronze_yellowTaxi.count())


In [0]:
# from pyspark.sql.functions import col, countDistinct, count, when

# bronze_yellowTaxi = bronze_yellowTaxi.dropna().dropDuplicates()
# display(bronze_yellowTaxi.count())
# display(bronze_yellowTaxi)

In [0]:
from pyspark.sql.functions import col

# Read Bronze as Stream (Auto Loader)
bronze_yellowTaxi_stream = (
    spark.readStream
        .format("delta")  # since bronze table is stored in delta format
        .table("newpavancatalog.bronze.bronze_yellowTaxi")
)

# Apply same transformations as before
silver_yellowTaxi_stream = (
    bronze_yellowTaxi_stream
        .dropna()
        .dropDuplicates()
)

# Write stream to Silver table



In [0]:
silver_stream = (
    silver_yellowTaxi_stream.writeStream
        .format("delta")
        .option("mergeSchema", "true")
        .option("checkpointLocation", "abfss://landing@parkaru15sa.dfs.core.windows.net/yellowtaxi/_silver_checkpoint")  # update your checkpoint path
        .outputMode("append")  # or "complete" if aggregations are involved
        .table("newpavancatalog.silver.silver_yellowTaxi")
)
display(silver_stream)

In [0]:
display(silver_yellowTaxi)


In [0]:
%sql
select count(*) from newpavancatalog.silver.silver_yellowTaxi;

In [0]:
from pyspark.sql.functions import date_format, count, sum, avg, round

# Read Silver table as streaming source
silver_yellowTaxi_stream = (
    spark.readStream
        .format("delta")
        .table("newpavancatalog.silver.silver_yellowTaxi")
)

# Apply transformations and aggregations
gold_yellowTaxi_stream = (
    silver_yellowTaxi_stream
        .withColumn("pickup_date", date_format("tpep_pickup_datetime", "dd-MM-yyyy HH:mm"))
        .groupBy("tpep_pickup_datetime", "VendorID")
        .agg(
            count("*").alias("total_trips"),
            round(sum("total_amount"), 2).alias("total_revenue"),
            round(avg("fare_amount"), 2).alias("avg_fare"),
            round(avg("tip_amount"), 2).alias("avg_tip")
        )
        .withColumnRenamed("tpep_pickup_datetime", "Pickup_Date")
        .withColumnRenamed("VendorID", "Vendor_ID")
        .withColumnRenamed("total_trips", "Total_Trips")
        .withColumnRenamed("total_revenue", "Total_Revenue")
        .withColumnRenamed("avg_fare", "Average_Fare")
        .withColumnRenamed("avg_tip", "Average_Tip")
)

# Write the stream to Gold Delta table
(
    gold_yellowTaxi_stream.writeStream
        .format("delta")
        .option("checkpointLocation", "abfss://landing@parkaru15sa.dfs.core.windows.net/yellowtaxigold/")  # set your checkpoint path
        .outputMode("complete")  # required for aggregations
        # .trigger(availableNow=True)  # process all current data and stop
        .table("newpavancatalog.gold.gold_yellowTaxi")
)


In [0]:
%sql
select count(*) from newpavancatalog.gold.gold_yellowTaxi

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS newpavancatalog.gold")

# write to gold 
gold_yellowTaxi_df.write.mode("overwrite").saveAsTable("newpavancatalog.gold.gold_yellowTaxi")
final_Gold_Table = spark.read.table("newpavancatalog.gold.gold_yellowTaxi")
display(final_Gold_Table.count())
display(final_Gold_Table)

In [0]:
%sql
drop volume operational_data

In [0]:
%sql
USE CATALOG newpavancatalog;
CREATE SCHEMA IF NOT EXISTS bronze;
USE newpavancatalog.bronze;
CREATE EXTERNAL VOLUME IF NOT EXISTS landingzone    
LOCATION 'abfss://landing@pavansa28.dfs.core.windows.net/landing'

In [0]:
%sql
show volumes

In [0]:
%fs ls /Volumes/pavan_catalog_all/bronze/landingzone

In [0]:
%sql
USE newpavancatalog.bronze;


In [0]:
%sql
use newpavancatalog.bronze;

CREATE TABLE IF NOT EXISTS newpavancatalog.bronze.
USING JDBC
OPTIONS (
  url 'jdbc:sqlserver://pavansqlserver28.database.windows.net:1433;database=pavansqldb28',
  dbtable 'refunds',
  user 'pavanteli',
  password 'Laptop@3570'
);

In [0]:
# Read the 'refunds' table from Azure SQL Database using JDBC
df_refunds = spark.read.format("jdbc").options(
    url="jdbc:sqlserver://pavansqlserver28.database.windows.net:1433;database=pavansqldb28",
    dbtable="refunds",
    user="pavanteli",
    password="Laptop@3570"
).load()

# Optionally, save as a managed table in Unity Catalog
df_refunds.write.mode("overwrite").saveAsTable(
    "pavan_catalog_all.bronze.refunds"
)

display(df_refunds)

In [0]:
%sql
select * from newpavancatalog.bronze.refunds