In [0]:
print(spark)

In [0]:
display(dbutils.fs.ls("/Workspace/"))

# DataBricks File System (DBFS):

In [0]:
#Create directory
dbutils.fs.mkdirs("/Workspace/foobar")
# dbutils.fs.ls("/Workspace/foobar/")

 #Create file and write
dbutils.fs.put("/Workspace/foobar/baz.txt", "Hello World!", overwrite=True)

 #Reading file
# dbutils.fs.head("/Workspace/foobar/baz.txt")

 #Access file path
# display(dbutils.fs.ls("dbfs:/Workspace/foobar"))

In [0]:
#Creare parquet file 
#Create DataFrame df1 with columns name,dept & age
data = [("James","Sales",34), ("Michael","Sales",56), \
    ("Robert","Sales",30), ("Maria","Finance",24) ]

columns= ["name","dept","age"]
dataframe1 = spark.createDataFrame(data = data, schema = columns)
dataframe1.printSchema()

In [0]:
dataframe1.write.mode('overwrite').parquet('dbfs:/Workspace/tmp/output/people3.parquet')

In [0]:
display(dbutils.fs.ls("dbfs:/Workspace/tmp/output/people3.parquet/"))

In [0]:
dataframe1.write.mode("append").partitionBy("age").parquet("dbfs:/Workspace/tmp/output/first.parquet")

In [0]:
#display(dbutils.fs.ls("dbfs:/Workspace/tmp/output/first.parquet/"))
display(dbutils.fs.ls("dbfs:/Workspace/tmp/output/first.parquet/age=24/"))

In [0]:
#read
df=spark.read.parquet("dbfs:/Workspace/tmp/output/people3.parquet/")
#df=spark.read.parquet("dbfs:/Workspace/tmp/output/people3.parquet/age=24/")#sometime works and sometime not
#df.show()
display(df)

In [0]:
#Append

#Create another one
data1 = [("James1","Sales1",34), ("Michael1","Sales",56), \
    ("Robert1","Sales1",30), ("Maria1","Finance",24) ]

columns1= ["name","dept","age"]
dataframe2 = spark.createDataFrame(data = data1, schema = columns1)

dataframe2.write.mode('append').parquet('dbfs:/Workspace/tmp/output/people3.parquet/')

In [0]:
data2 = [(2012,8,"Batman",9.8),
           (2012,8,"Hero",8.7),
           (2012,7,"Robot",5.5),
           (2011,7,"git",2.0)
  ]

columns = ["year","month","title","rating"]
df = spark.createDataFrame(data=data2,schema = columns)

df.write.mode("overwrite").partitionBy("year","month").format("avro").save("dbfs:/Workspace/tmp/output/tmp/test_Dataset01")

In [0]:
display(dbutils.fs.ls("dbfs:/Workspace/tmp/output/tmp/test_Dataset01"))

In [0]:
df= spark.read.format("avro").load("dbfs:/Workspace/tmp/output/tmp/test_Dataset01")
display(df)

In [0]:
# List
data = [{
    'col1': 'Category A',
    'col2': 100
}, {
    'col1': 'Category B',
    'col2': 200
}, {
    'col1': 'Category C',
    'col2': 300
}]


# Save as Orc
df.write.format('orc').mode('overwrite').save(
    'dbfs:/Workspace/tmp/FileStore/tables/userdata1_orc')

In [0]:
file_location = "dbfs:/Workspace/tmp/FileStore/tables/userdata1_orc"
file_type = "orc"

infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .load(file_location)

df.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

# Create large dataset
df = spark.range(0, 10_000)
df_transformed = df.withColumn("square", df["id"] * df["id"])

# First action: count()
start = time.time()
df_transformed.count()
print("Without Cache - First count done in:", time.time() - start, "seconds")

# Second action: sum()
start = time.time()
df_transformed.agg(F.sum("square")).show()
print("Without Cache - Second aggregation done in:", time.time() - start, "seconds")

In [0]:
#With cache
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

# Create large dataset
df = spark.range(0, 10_000)
df_transformed = df.withColumn("square", df["id"] * df["id"])
# Apply caching
df_transformed.cache()
# First action: count() triggers caching
start = time.time()
df_transformed.count() #first action is slower (because it's doing both compute + caching).
print("With Cache - First count done in:", time.time() - start, "seconds")

# Second action: sum() uses cached data
start = time.time()
df_transformed.agg(F.sum("square")).show()
print("With Cache - Second aggregation done in:", time.time() - start, "seconds")

In [0]:

df_transformed.is_cached
     

df_transformed.unpersist()

## Practice Question:

In [0]:
# ---------------- Customers ----------------
customers_data = [
    (1, "Amit", "Delhi", "Active"),
    (2, "Neha", "Mumbai", "Active"),
    (3, "Raj", "Delhi", "Inactive"),
    (4, "Simran", "Bangalore", "Active"),
    (5, "John", "Chennai", "Active")
]

customers = spark.createDataFrame(
    customers_data,
    ["customer_id","name","city","status"]
)

# ---------------- Orders ----------------
orders_data = [
    (101,1,1000.0,"2024-01-01"),
    (102,2,None,"2024-01-02"),
    (103,1,-50.0,"2024-01-03"),
    (104,3,500.0,"2024-01-04"),
    (105,4,700.0,"2024-01-05"),
    (106,5,1200.0,"2024-01-06")
]

orders = spark.createDataFrame(
    orders_data,
    ["order_id","customer_id","amount","order_date"]
)

In [0]:
customers.write.mode("overwrite").partitionBy("customer_id").format("csv").save("/Workspace/Users/d30930606@gmail.com/Databricks/data/customers")
orders.write.mode("overwrite").partitionBy("order_id").format("csv").save("/Workspace/Users/d30930606@gmail.com/Databricks/data/orders")

In [0]:
dbutils.fs.put("/Workspace/Users/d30930606@gmail.com/Databricks/data/customers.csv", "", True)
dbutils.fs.put("/Workspace/Users/d30930606@gmail.com/Databricks/data/orders.csv", "", True)

customers.write.csv("/Workspace/Users/d30930606@gmail.com/Databricks/data/customers.csv", mode="overwrite")
orders.write.csv("/Workspace/Users/d30930606@gmail.com/Databricks/data/orders.csv", mode="overwrite")

In [0]:
# 4
dbutils.fs.mkdirs("/Workspace/Users/d30930606@gmail.com/Databricks/data/raw")

In [0]:
# 5. Read the stored files again into new DataFrames

customers_new = spark.read.csv("/Workspace/Users/d30930606@gmail.com/Databricks/data/customers.csv", schema=customers.schema)
orders_new = spark.read.csv("/Workspace/Users/d30930606@gmail.com/Databricks/data/orders.csv", schema=orders.schema)

# 6. Display the DataFrames
display(customers_new)
display(orders_new)

### STEP 2 — EXTRACT TASK (Instructions Only)

1. Store both datasets into DBFS as raw files.
2. Save the customers dataset in CSV format with header.
3. Save the orders dataset in CSV format with header.
4. Create a raw data folder structure inside DBFS.
5. Read the stored files again into new DataFrames.

### STEP 3 — TRANSFORM TASK (Instructions Only)

1. Perform the following transformations:
  - Clean the amount column:
    - Replace NULL values with 0.
    - Replace negative values with 10.
    - Keep remaining values unchanged.

2. Create a new column named final_amount by adding 18% GST.
3. Filter only customers whose status is Active.
4. oin orders data with active customers using customer_id.
5. Select only meaningful business columns:
    - customer name
    - city
    - order_id
    - final_amount
    - order_date

6. Calculate total order amount customer-wise.
7. Rank customers inside each city based on final_amount.

In [0]:
# STEP-3: TRANSFORMATIONS

from pyspark.sql import functions as F


# 1
orders_transformed = orders_new.fillna(0)
# Replace negative values with 10.
orders_transformed = orders_transformed.withColumn("amount", F.when(F.col("amount") < 0, 10).otherwise(F.col("amount")))
display(orders_transformed)

