In [0]:
from pyspark.sql.functions import rand

df  = spark.read.format('csv')\
    .option('header',True)\
    .option('inferschema',True)\
    .load('/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv')

df.limit(5).display()

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


## converting csv to delta **format**

In [0]:
df.write.format('delta')\
    .mode('overwrite')\
    .saveAsTable("delta_ecommerce_data")

In [0]:
spark.table("delta_ecommerce_data").display()


event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-21T19:47:52.000Z,view,4804056,2053013554658804075,electronics.audio.headphone,apple,160.34,513066676,cafca09f-7785-461f-9cbb-67e03787956e
2019-10-21T19:47:52.000Z,view,12300181,2053013556311359947,construction.tools.drill,dewalt,356.2,553528018,002f83d7-f527-47e0-96ad-162719b991a6
2019-10-21T19:47:52.000Z,view,1005098,2053013555631882655,electronics.smartphone,samsung,142.07,544071941,69a2eb47-e41b-44b9-8455-29925ec81504
2019-10-21T19:47:52.000Z,view,9900168,2053013552570040549,electronics.video.projector,vivitek,382.3,530379686,207304da-e4c4-4dcf-8602-6e803cee8466
2019-10-21T19:47:52.000Z,view,4300077,2053013552385491165,,timberk,43.73,527567542,3f9908fd-3ab1-4b9d-b8bb-f06fb9f7a162
2019-10-21T19:47:52.000Z,view,28718444,2053013565069067197,apparel.shoes.keds,strobbs,64.61,543674596,6eddf217-851d-4ac0-9ebe-2a90d15c53d3
2019-10-21T19:47:52.000Z,view,10400515,2053013553257906447,kids.toys,woddon,23.66,513468166,9eba2939-9409-4492-8ec5-4c4f30981ee8
2019-10-21T19:47:52.000Z,view,23301397,2053013561956893455,,disney,55.81,557973001,32ec5a98-2377-4952-9d7b-598f927c765d
2019-10-21T19:47:53.000Z,view,21401442,2053013561579406073,electronics.clocks,casio,37.83,516766365,5bddb7b6-7d63-443d-a39a-11f552cb6c04
2019-10-21T19:47:53.000Z,view,28718918,2053013565069067197,apparel.shoes.keds,nexpero,71.82,519136300,2fc01244-2522-46de-b1d8-a067ec9ae033


## creating sql delta **table**

In [0]:
%sql
create table if not exists customer_delta
(
  customer_id int,
  customer_name string,
  customer_age int
)
using delta;

In [0]:
%sql
insert into customer_delta values(1,'rohan',30),(2,'riya',25),(3,'shyam',32);

num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
select * from customer_delta;

customer_id,customer_name,customer_age
1,rohan,30
2,riya,25
3,shyam,32


In [0]:
%sql
update customer_delta 
set customer_name='Riya'
where customer_id=2;

num_affected_rows
1


In [0]:
%sql
select * from customer_delta;

customer_id,customer_name,customer_age
1,rohan,30
3,shyam,32
2,Riya,25


# schema enforcement

In [0]:
data_list = [(1,'chair','furniture',1000),(2,'mouse','electronic',5000),
            (3,'table','furniture',2000),(4,'keyboard','electronic',4000)
            ]

In [0]:
df1 = spark.createDataFrame(data_list)
df1.show()

+---+--------+----------+----+
| _1|      _2|        _3|  _4|
+---+--------+----------+----+
|  1|   chair| furniture|1000|
|  2|   mouse|electronic|5000|
|  3|   table| furniture|2000|
|  4|keyboard|electronic|4000|
+---+--------+----------+----+



In [0]:
df1=spark.createDataFrame(data_list).toDF('product_id','product_name','category','price')
df1.show()
df1.printSchema()

+----------+------------+----------+-----+
|product_id|product_name|  category|price|
+----------+------------+----------+-----+
|         1|       chair| furniture| 1000|
|         2|       mouse|electronic| 5000|
|         3|       table| furniture| 2000|
|         4|    keyboard|electronic| 4000|
+----------+------------+----------+-----+

root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: long (nullable = true)



In [0]:
from pyspark.sql.types import *

schema = StructType([StructField('product_id', IntegerType(), True),
                     StructField('product_name', StringType(), True),
                     StructField('category', StringType(), True),
                     StructField('price', DoubleType(), True)])


            

p  = spark.createDataFrame([(1,'chair','furniture',1000),(2,'mouse','electronic',5000),
                                   (3,'table','furniture',2000),(4,'keyboard','electronic',4000)
                                   ],schema
                           )


p.write.format('delta')\
    .mode('overwrite')\
    .option('mergeSchema','true')\
    .saveAsTable("delta_products")





In [0]:
spark.table('delta_products').display()

product_id,product_name,category,price
1,chair,furniture,1000.0
2,mouse,electronic,5000.0
3,table,furniture,2000.0
4,keyboard,electronic,4000.0


In [0]:
%sql
insert into delta_products values(5,'door','furniture')

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-6130932507447422>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m"[39m[38;5;124minsert into delta_products values(5,[39m[38;5;124m'[39m[38;5;124mdoor[39m[38;5;124m'[39m[38;5;124m,[39m[38;5;124m'[39m[38;5;124mfurniture[39m[38;5;124m'[39m[38;5;124m)[39m[38;5;130;01m\n[39;00m[38;5;124m"[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2541[0m, in [0;36mInteractiveShell.run_cell_magic[0;34m(self, magic_name, line, cell)[0m
[1;32m   2539[0m [38;5;28;01mwith[39;00m [38;5;28mself[39m[38;5;241m.[39mbuiltin_trap:
[1;32m   2540[0m     args [38;5;241m=[39m (magic_arg_s, cell)
[0;32m-> 254

In [0]:
%sql
insert into delta_products values(1,'desk','furniture',2000)

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
delete from delta_products where product_id=5;
    
select * from delta_products

product_id,product_name,category,price
1,chair,furniture,1000.0
2,mouse,electronic,5000.0
3,table,furniture,2000.0
4,keyboard,electronic,4000.0


In [0]:
spark.table('delta_products').display()

product_id,product_name,category,price
1,chair,furniture,1000.0
2,mouse,electronic,5000.0
3,table,furniture,2000.0
4,keyboard,electronic,4000.0


## handling duplicates

In [0]:
%sql
select product_id,count(*)
from delta_products
group by product_id
having count(*)>1

product_id,count(*)
1,2
