###Write SQL statements

In [0]:
%sql
create catalog if not exists telecom_catalog_assign

In [0]:
%sql
create schema if not exists telecom_catalog_assign.landing_zone

In [0]:
%sql
create volume if not exists telecom_catalog_assign.landing_zone.landing_vol

In [0]:
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/")

In [0]:
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/")

In [0]:
dbutils.fs.mkdirs("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/")

###Data files to use in this usecase:

In [0]:
customer_csv = ''' 101,Arun,31,Chennai,PREPAID 
102,Meera,45,Bangalore,POSTPAID 
103,Irfan,29,Hyderabad,PREPAID 
104,Raj,52,Mumbai,POSTPAID 
105,,27,Delhi,PREPAID 
106,Sneha,abc,Pune,PREPAID '''

usage_tsv = '''customer_id\tvoice_mins\tdata_mb\tsms_count 
101\t320\t1500\t20 
102\t120\t4000\t5 
103\t540\t600\t52 
104\t45\t200\t2 
105\t0\t0\t0 '''

tower_logs_region1 = '''event_id|customer_id|tower_id|signal_strength|timestamp 
5001|101|TWR01|-80|2025-01-10 10:21:54 
5004|104|TWR05|-75|2025-01-10 11:01:12 '''

tower_logs_region2 = '''event_id|customer_id|tower_id|signal_strength|timestamp 
5002|102|TWR02|-80|2025-01-10 10:21:54 
5003|103|TWR03|-75|2025-01-10 11:01:12 '''

###Filesystem operations

In [0]:
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv",customer_csv,overwrite=True)
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",usage_tsv,overwrite=True)
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.csv",tower_logs_region1,overwrite=True)
dbutils.fs.put("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region2.csv",tower_logs_region2,overwrite=True)

In [0]:
%sh

ls "/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/"

###Spark Directory Read Use Cases

In [0]:
df1=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/", header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df1.show()
df1.printSchema()

In [0]:
df2=spark.read.csv(path=["/Volumes/telecom_catalog_assign/landing_zone/landing_vol/","/Volumes/lakehouse/default/volume1/"], header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df2.show()

In [0]:
df3=spark.read.format("csv").option("header",True).option("sep","|").option("inferSchema",True).option("pathGlobFilter","tower*.csv").option("RecursiveFileLookup",True).load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/")
df3.show()

In [0]:
df4=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=True, inferSchema=True)
df4.show()

###Schema Inference, Header, and Separator

In [0]:
df4=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False)
df4.show()

In [0]:
df5= spark.read.format("csv").option("header",True).option("inferSchema",True).load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv")
df5.show()

In [0]:
df5= spark.read.format("csv").options(header=True,inferSchema=True).load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv")
df5.show()

**Write a note on What changed when we use header or inferSchema with true/false?**<br>
- When header=True is specified and the customer_csv file has no header row, Spark uses the first data row as column names. When inferSchema=True is enabled, Spark infers column data types by scanning the full dataset. Because the age column includes a non-numeric value ("abc"), it is inferred as String type.
- When header=False is specified, Spark uses default column names (_c0, _c1, _c2, _c3, _c4) and treats the header row as a data row. When inferSchema=False is specified, all columns are inferred as String type by default.

**How schema inference handled “abc” in age?**<br>

The age column is inferred as a string data type because one of the records has an non-numeric value ("abc").

In [0]:
#Apply column names using string using toDF function for customer data
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.show()


###Column Renaming Usecases

In [0]:
#Apply column names and datatype using the schema function for usage data
schema1="customer_id integer,voice_mins integer,	data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.show()
df7.printSchema()

In [0]:
#Apply column names and datatype using the StructType with IntegerType, StringType, TimestampType and other classes for towers data
from pyspark.sql.types import StructType,IntegerType,StringType,TimestampType,StructField
Struct_str=StructType([StructField("id",IntegerType(),True),StructField("customer_id",IntegerType(),False),StructField("tower_id",StringType(),False),StructField("signal_strength",IntegerType(),True),StructField("timestamp",TimestampType(),True)])
df8=spark.read.schema(Struct_str).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_logs_region1.csv",sep="|",header=True)
df8.show()

###Write Operations (Data Conversion/Schema migration) – CSV Format Usecases

In [0]:
#Write customer data into CSV format using overwrite mode
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/csvout/", mode="overwrite")

In [0]:
#Write usage data into CSV format using append mode
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_csvout/", mode="append")

In [0]:
#Write tower data into CSV format with header enabled and custom separator (|)
df2=spark.read.csv(path=["/Volumes/telecom_catalog_assign/landing_zone/landing_vol/","/Volumes/lakehouse/default/volume1/"], header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df2.write.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_csvout/", header=True, sep="|")

In [0]:
#Read the tower data in a dataframe and show only 5 rows.
df2.show(5)

###Write Operations (Data Conversion/Schema migration)– JSON Format Usecases

In [0]:
#Write customer data into JSON format using overwrite mode
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/jsonout/", mode="overwrite")

In [0]:
#Write usage data into JSON format using append mode and snappy compression format
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_jsonout/", mode="append", compression = "snappy")

In [0]:
#Write tower data into JSON format using ignore mode and observe the behavior of this mode
df2=spark.read.csv(path=["/Volumes/telecom_catalog_assign/landing_zone/landing_vol/","/Volumes/lakehouse/default/volume1/"], header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df2.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_jsonout/", mode="ignore")

In [0]:
#Read the tower data in a dataframe and show only 5 rows.
df2.show(5)

###Write Operations (Data Conversion/Schema migration) – Parquet Format Usecases

In [0]:
#Write customer data into Parquet format using overwrite mode and in a gzip format
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/paruetout/", mode="overwrite",compression="gzip")

In [0]:
#Write usage data into Parquet format using error mode
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_parquetout/", mode="error")

In [0]:
#Write tower data into Parquet format with gzip compression option
df2=spark.read.csv(path=["/Volumes/telecom_catalog_assign/landing_zone/landing_vol/","/Volumes/lakehouse/default/volume1/"], header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df2.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_parquetout/", compression="gzip")

In [0]:
#Read the usage data in a dataframe and show only 5 rows.
df7.show(5)

###Write Operations (Data Conversion/Schema migration) – Orc Format Usecases

In [0]:
#Write customer data into ORC format using overwrite mode
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/orcout/", mode="overwrite")

In [0]:
#Write usage data into ORC format using append mode
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orcout/", mode="append")

In [0]:
#Write tower data into ORC format and see the output file structure
df2=spark.read.csv(path=["/Volumes/telecom_catalog_assign/landing_zone/landing_vol/","/Volumes/lakehouse/default/volume1/"], header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df2.write.orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_orcout/")

In [0]:
#Read the usage data in a dataframe and show only 5 rows.
df7.show(5)

###Write Operations (Data Conversion/Schema migration) – Delta Format Usecases

In [0]:
#Write customer data into Delta format using overwrite mode
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.format("delta").option("mode", "overwrite").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/deltaout/")

In [0]:
#Write usage data into Delta format using append mode
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.format("delta").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_deltaout/", mode="append")

In [0]:
#Write tower data into Delta format and see the output file structure
df2=spark.read.csv(path=["/Volumes/telecom_catalog_assign/landing_zone/landing_vol/","/Volumes/lakehouse/default/volume1/"], header=True,sep="|",inferSchema=True, pathGlobFilter="tower*.csv",recursiveFileLookup=True)
df2 = df2.toDF(*[c.replace(" ", "_") for c in df2.columns])
df2.write.format("delta").mode("overwrite").option("mergeschema","true").option("sep","|").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/tower/tower_deltaout/")

In [0]:
#Read the usage data in a dataframe and show only 5 rows.
df2.show(5)

###11. Write Operations (Lakehouse Usecases) – Delta table Usecases

In [0]:
#Write customer data using saveAsTable() as a managed table
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.saveAsTable("telecom_catalog_assign.landing_zone.customer_managed")

In [0]:
#Write usage data using saveAsTable() with overwrite mode
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.saveAsTable("telecom_catalog_assign.landing_zone.usage_managed", mode="overwrite")


In [0]:
#Drop the managed table and verify data removal
spark.sql("DROP TABLE telecom_catalog_assign.landing_zone.customer_managed")

In [0]:
#Use spark.read.sql to write some simple queries on the above tables created.
spark.sql("select * from telecom_catalog_assign.landing_zone.usage_managed")

In [0]:
%sql
select * from telecom_catalog_assign.landing_zone.usage_managed

###12. Write Operations (Lakehouse Usecases) – Delta table Usecases

In [0]:
%sql
create table telecom_catalog_assign.landing_zone.customer1 (id STRING,name String, age String, city String, type String)

In [0]:
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.printSchema()
df6.show(2)

In [0]:
#Write customer data using insertInto() in a new table and find the behavior
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.insertInto("telecom_catalog_assign.landing_zone.customer1", overwrite=True)

In [0]:
%sql
create table telecom_catalog_assign.landing_zone.usage1 (customer_id INT, voice_mins INT, data_mb INT, sms_count INT)

In [0]:
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.printSchema()
df7.write.insertInto("telecom_catalog_assign.landing_zone.usage1", overwrite=True)

###13. Write Operations (Lakehouse Usecases) – Delta table Usecases

In [0]:
#Write customer data into XML format using rowTag as cust
df6=spark.read.csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_csv.csv", header=False, inferSchema=False).toDF("id","name","age","city","type")
df6.write.xml("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/customer/customer_xml", rowTag="cust")


In [0]:
#Write usage data into XML format using overwrite mode with the rowTag as usage
schema1="customer_id integer,voice_mins integer,data_mb	integer,sms_count integer"
df7=spark.read.schema(schema1).csv("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_tsv.csv",header=True,sep="\t")
df7.write.xml("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_xml", rowTag="usage", mode="overwrite")

###15. Try to do permutation and combination of performing Schema Migration & Data Conversion operations like...

In [0]:
#Read any one of the above orc data in a dataframe and write it to dbfs in a parquet format
df8=spark.read.orc("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orcout/")
df8.write.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet", mode="overwrite")

In [0]:
#Read any one of the above parquet data in a dataframe and write it to dbfs in a delta format
df9 = spark.read.parquet("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet/")
df9.write.format("delta").save("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet_delta")

In [0]:
#Read any one of the above delta data in a dataframe and write it to dbfs in a xml format
df10 = spark.read.format("delta").load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet_delta")
df10.write.xml("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet_delta_xml",rowTag="usage")

In [0]:
#Read any one of the above delta table in a dataframe and write it to dbfs in a json format
df11= spark.read.format("delta").load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet_delta")
df11.write.json("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet_delta_json")

In [0]:
#Read any one of the above delta table in a dataframe and write it to another table
df12=spark.read.format("delta").load("/Volumes/telecom_catalog_assign/landing_zone/landing_vol/usage/usage_orc_parquet_delta")
df12.write.saveAsTable("telecom_catalog_assign.landing_zone.usage_orc_parquet_delta")