In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import *
spark=SparkSession.builder.appName("sep 17").getOrCreate()
dbutils.fs.cp("file:/Workspace/Shared/Employee.csv","dbfs:/Filestore/Employee.csv")
dbutils.fs.cp("file:/Workspace/Shared/products.json","dbfs:/Filestore/products.json")
employee_df=spark.read.format("csv").option("header","true").load("dbfs:/Filestore/Employee.csv")
employee_df.show()
products_df=spark.read.format("json").option("multiline","true").load("/content/products.json")
employee_df.write.format("delta").mode("overwrite").save("/delta/employee_delta")
products_df.write.format("delta").mode("overwrite").save("/delta/products_delta")
employee_delta = spark.read.format("delta").load("/delta/employee_delta")
products_delta = spark.read.format("delta").load("/delta/products_delta")

In [0]:

dbutils.fs.cp("File:/Workspace/Shared/new_employee.csv","dbfs:/Filestore/new_employee.csv")
employee_new_df=spark.read.format("csv").option("header","true").load("dbfs:/Filestore/new_employee.csv")
employee_new_df.write.format("delta").mode("overwrite").save("/delta/employee_new_delta")
employee_new_delta=spark.read.format("delta").load("/delta/employee_new_delta")
employee_delta.createOrReplaceTempView("employee_delta")
employee_new_delta.createOrReplaceTempView("new_employee_delta")

In [0]:

spark.sql("""
          merge into employee_delta as target
          using new_employee_delta as source
          on target.employeeID=source.employeeID
          when matched then update set target.Salary=source.Salary
          when not matched then 
           insert (EmployeeID, EmployeeName, JoiningDate, Salary)
           values (source.EmployeeID, source.EmployeeName, source.JoiningDate, source.Salary)
          """)
spark.sql("select * from employee_delta").show()

In [0]:
spark.sql("""
          create table if not exists employee  as select * from employee_delta
          """)
#Optmizing the table using zordering and optimize
spark.sql("optimize employee zorder by(Salary)")

#describing the history of the delta table
spark.sql("DESCRIBE HISTORY employee").show()

#vacuuming the table abd storing data of previous 7 days only
spark.sql("Vacuum employee retain 168 hours")

#using versioning of delta lake to find data with certain version
spark.sql("SELECT * FROM employee VERSION AS OF 3")


In [0]:
dbutils.fs.cp("file:/Workspace/Shared/transaction.csv","dbfs:/Filestore/streaming/input/transaction.csv")
transaction_schema="transactionID String, transactionDate DATE, productID STRING,Quantity INT,Price INT"
static_df = spark.read.format("csv").option("header", "true").load("dbfs:/Filestore/streaming/input/transaction.csv")
schema = static_df.schema
streaming_df = spark.readStream.format("csv").option("header", "true").schema(schema).load("dbfs:/Filestore/streaming/input/")
query = streaming_df.writeStream.format("console").start()

In [0]:
transformed_df = streaming_df.withColumn("TotalAmount", streaming_df["Quantity"] * streaming_df["Price"]).filter(streaming_df["Quantity"] > 1)
query = transformed_df.writeStream.format("memory").queryName("transformed_stream").start()

from pyspark.sql.functions import col, sum

#Group the data by ProductID and calculate the total sales for each product
aggregated_df = streaming_df.groupBy("ProductID").agg(sum(col("Quantity") * col("Price")).alias("TotalSales"))
query = aggregated_df.writeStream.format("console").outputMode("update").start()

In [0]:
query = transformed_df.writeStream.format("parquet").option("path", "/dbfs/FileStore/parquet") \
                                   .option("checkpointLocation", "/dbfs/FileStore/checkpoint") \
                                   .start()
from pyspark.sql.functions import col, to_timestamp

streaming_df = streaming_df.withColumn("TransactionDate", to_timestamp(col("TransactionDate")))
watermarked_df = streaming_df.withWatermark("TransactionDate", "1 day")
watermarked_query = watermarked_df.writeStream.format("console")


In [0]:
# Stream 1: Incoming transaction data (CSV)
transactions_stream = spark.readStream.format("csv") \
    .option("header", "true") \
    .option("basePath", "dbfs:/Filestore/streaming/input/") \
    .schema("TransactionID STRING, TransactionDate DATE, ProductID STRING, Quantity INT, Price DOUBLE") \
    .load("dbfs:/Filestore/streaming/input/")

# Stream 2: Product information (JSON)
products_stream = spark.readStream.format("json") \
    .option("basePath", "dbfs:/Filestore/streaming/input/") \
    .schema("ProductID STRING, ProductName STRING, Category STRING") \
    .load("dbfs:/Filestore/streaming/input/")

# Join both streams on ProductID
joined_stream = transactions_stream.join(products_stream, "ProductID")

# Write the joined stream to the console to visualize results
query = joined_stream.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

In [0]:
dbutils.fs.cp("file:/Workspace/Shared/orders.csv","dbfs:/Filestore/streaming/input/orders.csv")
orders_schema="OrderID String,OrderDate Date,CustomerID String,Product String,Quantity INT,Price INT"
orders_stream=spark.readStream.format("csv").option("header","true").schema(orders_schema).load("dbfs:/Filestore/streaming/input/")



In [0]:
!pip install DLT

Collecting DLT
  Obtaining dependency information for DLT from https://files.pythonhosted.org/packages/8c/42/522adf6823e4b1e9ea4930dc01695e958662b6bb0141d7be47c5f2826441/dlt-1.0.0-py3-none-any.whl.metadata
  Downloading dlt-1.0.0-py3-none-any.whl.metadata (10 kB)
Collecting fsspec>=2022.4.0 (from DLT)
  Obtaining dependency information for fsspec>=2022.4.0 from https://files.pythonhosted.org/packages/1d/a0/6aaea0c2fbea2f89bfd5db25fb1e3481896a423002ebe4e55288907a97a3/fsspec-2024.9.0-py3-none-any.whl.metadata
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Collecting giturlparse>=0.10.0 (from DLT)
  Obtaining dependency information for giturlparse>=0.10.0 from https://files.pythonhosted.org/packages/dd/94/c6ff3388b8e3225a014e55aed957188639aa0966443e0408d38f0c9614a7/giturlparse-0.12.0-py2.py3-none-any.whl.metadata
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from DLT)
  Obtaining dependency information for hexbytes>=0.

In [0]:
from pyspark.sql.functions import col
import dlt

@dlt.table
def orders_transformed():
    df_orders=spark.read.format("csv").option("header","true").load("dbfs:/Filestore/streaming/input/orders.csv")
    df_order=df_orders.withColumn("TotalPrice",col("Quantity")*col("Price")).filter(col("Quantity")>1)
    return df_order

@dlt.table
def orders_streamed():
    return dlt.read_stream("orders_transformed")

Name,Type
OrderID,string
OrderDate,string
CustomerID,string
Product,string
Quantity,string
Price,string
TotalPrice,double


In [0]:
df_orders = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/Filestore/streaming/input/orders.csv")

df_orders.write.format("delta").mode("overwrite").save("/delta/orders")



In [0]:
%sql
CREATE OR REPLACE LIVE TABLE transformed_orders AS
SELECT *, Quantity * Price AS TotalAmount
FROM delta.`/delta/orders`
WHERE Quantity > 1;

In [0]:
# Reading the Delta table in PySpark
df = spark.read.format("delta").load("/delta/orders")
df.show()

In [0]:
%sql
-- Reading the Delta table using SQL
SELECT * FROM delta.`/delta/orders`;

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import expr

# Load the Delta table
delta_table = DeltaTable.forPath(spark, "/delta/orders")

# Update the Price of 'Laptop' by increasing it by 10%
delta_table.update(
    condition = expr("Product = 'Laptop'"),
    set = { "Price": expr("Price * 1.1") }
)


In [0]:
l
-- Update product price using SQL (increase Laptop price by 10%)
UPDATE delta.`/delta/orders`
SET Price = Price * 1.1
WHERE Product = 'Laptop';


In [0]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

# Insert a new record into the Delta table
new_data = [(106, 'C006', 'Keyboard', 2, 50)]
columns = ["OrderID", "CustomerID", "Product", "Quantity", "Price"]

# Define the schema of the Delta table
schema = StructType([
    StructField("OrderID", IntegerType(), nullable=False),
    StructField("CustomerID", StringType(), nullable=False),
    StructField("Product", StringType(), nullable=False),
    StructField("Quantity", IntegerType(), nullable=False),
    StructField("Price", IntegerType(), nullable=False)
])

# Create a DataFrame with new data and specified schema
new_df = spark.createDataFrame(new_data, schema)

# Append the new data to the Delta table
new_df.write.format("delta").mode("append").save("/delta/orders")

In [0]:
df_version = spark.read.format("delta").option("versionAsOf", 1).load("/delta/orders")
df_version.show()

# Query the table as it existed at a specific timestamp
df_timestamp = spark.read.format("delta").option("timestampAsOf", "2024-09-17T04:41:20Z").load("/delta/orders")
df_timestamp.show()

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# Define the schema of your Parquet file
schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("OrderDate", DateType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", DoubleType(), True),
])

parquet_path = "/dbfs/FileStore/"
delta_path = "/dbfs/FileStore/delta_table"

# Read Parquet data with the defined schema
df_parquet = spark.read.format("parquet").schema(schema).load(parquet_path)

# Write the data in Delta format
df_parquet.write.format("delta").mode("overwrite").save(delta_path)

# Register the Delta table
spark.sql(f"CREATE TABLE delta_table USING DELTA LOCATION '{delta_path}'")

# Query the newly converted Delta table
df_converted = spark.sql("SELECT * FROM delta_table")
df_converted.show()
