In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df_emp = spark.read.csv('/Volumes/workspace/default/test_practice/emp.csv',header=True)
df_dept = spark.read.csv('/Volumes/workspace/default/test_practice/dept.csv',header=True)

In [0]:
df_emp.display()

In [0]:
df_emp = df_emp.withColumn('SAL', col('SAL').cast('int'))

In [0]:
from pyspark.sql.window import Window
win = Window.partitionBy('DEPTNO').orderBy(desc('SAL'))
df_emp.withColumn('rank',rank().over(win)).display()

In [0]:
from pyspark.sql.window import Window
win = Window.partitionBy().orderBy(desc('SAL'))
df_emp.withColumn('row_num',row_number().over(win)).display()

In [0]:
# sales data-- cust_id, product_id,trans_date,price
# product -- product_id, product_name,cat_name
# which product has been sold maximum in last 30 days

In [0]:
s1 = '''{'ZIP_CD':'212211','ADDRESS':'ASUGWMS','AREA_CD':32}'''
df_json = spark.createDataFrame([(1,s1)],schema = ['id','json_string'])
df_json.display()

In [0]:
df_json.select(col('json_string.ZIP_CD')).display()

In [0]:
json_schema = StructType([
    StructField('ZIP_CD',StringType(),True),
    StructField('ADDRESS',StringType(),True),
    StructField('AREA_CD',IntegerType(),True)
])

In [0]:
df_json = df_json.select(from_json(col('json_string'),schema=json_schema).alias('json_ext'))
df_json.display()

In [0]:
df_json.select(col('json_ext.ZIP_CD').alias('ZIP_CD'), col('json_ext.ADDRESS').alias('ADDRESS'),col('json_ext.AREA_CD').alias('AREA_CD')).display()

In [0]:
json_schema = StructType([
    StructField('NAME',StringType(),True),
    StructField('AGE',IntegerType(),True),
    StructField('CITY',StringType(),True),
    StructField('STATE',StringType(),True)
])

In [0]:
spark.read.format('json').schema(json_schema).load('/Volumes/workspace/default/test_practice/file.json').display()

In [0]:
json_nest_sch = StructType([
    StructField('NAME',StringType(),True),
    StructField('AGE',IntegerType(),True),
    StructField('ADDRESS',StructType([
        StructField('CITY',StringType(),True),
        StructField('STATE',StringType(),True),
    ]))
])

In [0]:
df_1 = spark.read.format('json').schema(json_nest_sch).load('/Volumes/workspace/default/test_practice/file_nested.json')
df_1.display()

In [0]:
df_1.select(to_json(col('ADDRESS')).alias('ADDRESS')).display()

In [0]:
df_1 = df_1.select('NAME','AGE',col("ADDRESS.CITY").alias('CITY'),col("ADDRESS.STATE").alias('STATE'))

In [0]:
df_1.write.format('csv').option('header',True).mode('overwrite').save('/Volumes/workspace/default/test_practice/file_nested')

In [0]:
spark.read.format('csv').option('header',True).load('/Volumes/workspace/default/test_practice/file_nested/').display()

## Schema Enforcement

In [0]:
schema = StructType([
    StructField('NAME',StringType(),True),
    StructField('AGE',FloatType(),True),
    StructField('ADDRESS',StringType(),True),
    StructField('error_rec',StringType(),True)
])

In [0]:
df = spark.createDataFrame([('ABGD',34,'GWUGWHDIE'),('UEGS',38,'HWUYDJW'),('GWUGD',67,'YGYUW*')],schema=schema)
df.display()

In [0]:
df = spark.read.format('csv').option('mode','permissive').schema(schema).option('columnNameOfCorruptRecord','error_rec').load('/Volumes/workspace/default/test_practice/bad_rec_data.csv',header='True')
df.display()

In [0]:
#mergeSchema--- schema evolution
#overwriteSchema --schema evolution

In [0]:
df.write.mode('append').saveAsTable('workspace.default.student')

In [0]:
df.write.format('delta').mode('append').save('/Volumes/workspace/default/test_practice/student_details/')

In [0]:
schema_new = StructType([
    StructField('NAME',StringType(),True),
    StructField('AGE',FloatType(),True),
    StructField('ADDRESS',StringType(),True),
    StructField('GENDER',StringType(),True),
    StructField('error_rec',StringType(),True)
])

In [0]:
df_new = spark.createDataFrame([('AVDW',42,'GWUWBSUW','F',None)],schema=schema_new)
df_new.display()

In [0]:
df_new.write.mode('append').option("mergeSchema", "true").saveAsTable('workspace.default.student')

In [0]:
df_new.write.format('delta').mode('append').option("mergeSchema", "true").save('/Volumes/workspace/default/test_practice/student_details/')

In [0]:
Lakehouse
delta lake
delta table--format
parquet--

In [0]:
delta format = parquet file (storing data) + delta_log(crc and json file for meta data)

In [0]:
df=spark.read.format('delta').load('/Volumes/workspace/default/test_practice/student_details')
df.display()

In [0]:
%sql
CREATE TABLE CUSTOMER(
  CUST_ID INT,
  CUST_NAME STRING,
  CUST_CITY STRING
)
USING DELTA

In [0]:
from delta.tables import DeltaTable

In [0]:
df = DeltaTable.forName(spark,'workspace.default.student')

In [0]:
df.history().display()

In [0]:
df.toDF().display()

In [0]:
DeltaTable.forPath(spark, '/Volumes/workspace/default/test_practice/target_data').toDF().display()

In [0]:
%sql
DESCRIBE HISTORY STUDENT

In [0]:
spark.read.format('delta').table('student').display()

In [0]:
spark.read.format('delta').option('versionAsOf', '0').table('student').display()

In [0]:
spark.read.format('delta').option('TimestampAsOf', '2025-07-22T02:13:08.000+00:00').table('student').display()

In [0]:
%sql
select * from student version as of 0

In [0]:
df.restoreToTimestamp('2025-07-22T02:13:08.000+00:00')


In [0]:
%sql
RESTORE TABLE student VERSION AS OF 0 

In [0]:
%sql
DESCRIBE HISTORY student

In [0]:
%sql
select * from student