##1. Write SQL statements to create:
1. A catalog named telecom_catalog_assign
2. A schema landing_zone
3. A volume landing_vol
4. Using dbutils.fs.mkdirs, create folders:<br>
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/
/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/
5. Explain the difference between (Just google and understand why we are going for volume concept for prod ready systems):<br>
a. Volume vs DBFS/FileStore<br>
b. Why production teams prefer Volumes for regulated data<br>


In [0]:
%sql
Create catalog If not exists telecom_catalog_assign;
Create Schema If not exists telecom_catalog_assign.landing_zone;
Create volume If not exists telecom_catalog_assign.landing_zone.landing_vol;

In [0]:
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/")

Telecom Domain Read & Write Ops Assignment - Building Datalake & Lakehouse
This notebook contains assignments to practice Spark read options and Databricks volumes.
Sections: Sample data creation, Catalog & Volume creation, Copying data into Volumes, Path glob/recursive reads, toDF() column renaming variants, inferSchema/header/separator experiments, and exercises.

##Data files to use in this usecase:
customer_csv = '''
101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
'''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count
101\t320\t1500\t20
102\t120\t4000\t5
103\t540\t600\t52
104\t45\t200\t2
105\t0\t0\t0
'''

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
'''


##2. Filesystem operations
1. Write dbutils.fs code to copy the above datasets into your created Volume folders:
Customer → /Volumes/.../customer/
Usage → /Volumes/.../usage/
Tower (region-based) → /Volumes/.../tower/region1/ and /Volumes/.../tower/region2/

2. Write a command to validate whether files were successfully copied

In [0]:
customer_csv = '''
101,Arun,31,Chennai,PREPAID
102,Meera,45,Bangalore,POSTPAID
103,Irfan,29,Hyderabad,PREPAID
104,Raj,52,Mumbai,POSTPAID
105,,27,Delhi,PREPAID
106,Sneha,abc,Pune,PREPAID
'''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv", customer_csv, True)

if dbutils.fs.ls("dbfs:/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv"):
    print("Success: File copied.")
else:
    print("Error: File not found.")

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count 101\t320\t1500\t20 102\t120\t4000\t5 103\t540\t600\t52 104\t45\t200\t2 105\t0\t0\t0 '''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage.tsv", usage_tsv, True)    

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp
5001|101|TWR01|-80|2025-01-10 10:21:54
5004|104|TWR05|-75|2025-01-10 11:01:12
'''
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.csv", tower_logs_region1, True)

##3. Spark Directory Read Use Cases
1. Read all tower logs using:
Path glob filter (example: *.csv)
Multiple paths input
Recursive lookup

2. Demonstrate these 3 reads separately:
Using pathGlobFilter
Using list of paths in spark.read.csv([path1, path2])
Using .option("recursiveFileLookup","true")

3. Compare the outputs and understand when each should be used.

In [0]:
# Reading only CSV files from a directory
df = (spark.read
      .format("csv")
      .option("pathGlobFilter", "*.csv")  # Only matches files ending in .csv
      .option("recursiveFileLookup", "true")
      .load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol"))
display(df)

In [0]:
df = (spark.read.options(header=False, inferSchema=True).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer")
      ).toDF("customer_id", "name", "age", "city", "plan")
display(df)
display(df.count())

In [0]:
df1=(spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")).toDF("customer_id", "name", "age", "city", "plan")

df2=(spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_1.csv")).toDF("customer_id", "name", "age", "city", "plan")
combined_df = df1.unionByName(df2)
# display(combined_df)

(df2.write
  .format("csv")
  .option("header", "true")
  .option("sep", "|")
  .mode("overwrite")
  .save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_2.csv"))
 
df3=spark.read.option("sep", "|").csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_2.csv")
df3.show(3)


In [0]:
dbutils.help()


In [0]:
###Option 1. Using simple string format of define schema. (least used because it will not support some complex formats)
schema_Stru="customer_id INT, name STRING, age INT, city STRING, plan STRING"
df = (spark.read
      .format("csv")
      .schema(schema_Stru)
      .csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")
      )
###Option 2. Using StructType to define schema. (Recommended) /Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv
display(df)
df.show(3)
 

In [0]:
#IMPORTANT: 2. Using structure type to define schema.
#For the given data "id,fname,lname,age,prof" i am going to use StructType(StructField("id",IntegerType()),StructField("fname",StringType())...)
#Pattern: StructType([StructField("colname",DataType()),StructField("colname",DataType())......])
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
str1_schema = StructType([
     StructField("customer_id",  IntegerType(), True),
     StructField("name",  StringType(), True),
     StructField("age",  IntegerType(), True),
     StructField("city",  StringType(), True)])
dfstru=spark.read.schema(str1_schema).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer.csv")
dfstru.printSchema()


**Reading data from json**

In [0]:
df_json=spark.read.option("allowComments", "True").json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/JSON FIle/SampleJson.txt",)
display(df_json.take(10))
 


In [0]:
df_json=spark.read.option("allowComments", "True").option("escape", "~").json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/JSON FIle/SampleJson_2.txt",)
display(df_json)