In [0]:
class bronze():
    def __init__(self):
        self.dir = "dbfs:/FileStore/tables/"

    def getschema(self):
        return """InvoiceNumber string, CreatedTime bigint, StoreID string, PosID string, CashierID string,
                CustomerType string, CustomerCardNo string, TotalAmount double, NumberOfItems bigint, 
                PaymentMethod string, TaxableAmount double, CGST double, SGST double, CESS double, 
                DeliveryType string,
                DeliveryAddress struct<AddressLine string, City string, ContactNumber string, PinCode string, 
                State string>,
                InvoiceLineItems array<struct<ItemCode string, ItemDescription string, 
                    ItemPrice double, ItemQty bigint, TotalValue double>>"""
    def read_invoices(self):
        return (spark.readStream
                .format("json")\
                .schema(self.getschema())\
                .option("cleanSource","Archive")\
                .option("SourceArchiveDir",f"{self.dir}/poc/archive")\
                .load(f"{self.dir}/spark_structured_streaming/invoices/"))
    def process(self):
        invoice_df = self.read_invoices()
        display(invoice_df)
        query = (invoice_df.writeStream
                    .queryName("invoicesquery_1")\
                    .option("CheckPointLocation",f"{self.dir}/cpl_1/logs")\
                    .outputMode("append")\
                    .toTable("brinvoices_1"))
        return query

In [0]:
class silver():
    def __init__(self):
        self.dir = "FileStore/tables/spark_structured_streaming"
    def read_inv(self):
        return (spark.readStream.table("brinvoices_1"))
    def explode_df(self,inv_df):
        from pyspark.sql.functions import explode,expr 
        return (inv_df.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID",
                                      "CustomerType", "PaymentMethod", "DeliveryType", "DeliveryAddress.City",
                                      "DeliveryAddress.State","DeliveryAddress.PinCode", 
                                      "explode(InvoiceLineItems) as LineItem"))
    def flattend_df(self,explode_df):
        from pyspark.sql.functions import expr
        return (explode_df.withColumn("ItemCode", expr("LineItem.ItemCode"))
                        .withColumn("ItemDescription", expr("LineItem.ItemDescription"))
                        .withColumn("ItemPrice", expr("LineItem.ItemPrice"))
                        .withColumn("ItemQty", expr("LineItem.ItemQty"))
                        .withColumn("TotalValue", expr("LineItem.TotalValue"))
                        .drop("LineItem"))
    def append_data(self,flattend_df):
        return (flattend_df.writeStream
                 .format("delta")\
                 .option("checkpointLocation",f"{self.dir}/cpl_1/logs")\
                 .queryName("test_1")\
                 .outputMode("append")\
                 .toTable("slinvoices_1"))
    def process(self):
        inv_df = self.read_inv()
        explode_df = self.explode_df(inv_df)
        flatt_df = self.flattend_df(explode_df)
        app = self.append_data(flatt_df)
        return app


In [0]:
br = bronze()
sl = silver()
b = br.process()
s = sl.process()

InvoiceNumber,CreatedTime,StoreID,PosID,CashierID,CustomerType,CustomerCardNo,TotalAmount,NumberOfItems,PaymentMethod,TaxableAmount,CGST,SGST,CESS,DeliveryType,DeliveryAddress,InvoiceLineItems
94201418,1595689270697,STR7443,POS365,OAS845,PRIME,8790333340,9000.0,4,CASH,9000.0,225.0,225.0,11.25,HOME-DELIVERY,"List(House No 383, 4427 Pellentesque Rd., Bokaro Steel City, 1442202063, 509723, Jharkhand)","List(List(258, Closet, 1687.0, 2, 3374.0), List(538, Grandmother clock, 1301.0, 1, 1301.0), List(528, Projection clock, 2365.0, 1, 2365.0), List(673, Dough scraper, 980.0, 2, 1960.0))"
8749479,1595689270697,STR5864,POS872,OAS287,PRIME,7589671731,7646.0,4,CASH,7646.0,191.15,191.15,9.5575,HOME-DELIVERY,"List(1852 Est St., Imphal, 6124913142, 120023, Manipur)","List(List(593, Hanging curtains, 1896.0, 2, 3792.0), List(308, Butterfly chair, 857.0, 2, 1714.0), List(383, Innerspring Mattress, 655.0, 1, 655.0), List(423, Quilt, 1485.0, 1, 1485.0))"
91509413,1595689270798,STR2629,POS253,OAS737,NONPRIME,2461788838,7453.0,3,CARD,7453.0,186.325,186.325,9.31625,HOME-DELIVERY,"List(House No 740, 6689 Tempor Av., Bharatpur, 8563079826, 932264, Rajasthan)","List(List(528, Projection clock, 2365.0, 2, 4730.0), List(503, Chef's knife, 1973.0, 1, 1973.0), List(653, Browning tray, 375.0, 2, 750.0))"
54315437,1595689270798,STR5864,POS872,OAS287,PRIME,7589671731,4006.0,2,CASH,4006.0,100.15,100.15,5.0075,HOME-DELIVERY,"List(Flat No. #335-7984 Senectus Rd., Pali, 6125618251, 900530, Rajasthan)","List(List(238, Dining table, 1582.0, 2, 3164.0), List(273, Bedroom set, 842.0, 1, 842.0))"
84526449,1595689270798,STR2952,POS152,OAS329,PRIME,3027514652,3288.0,1,CASH,3288.0,82.2,82.2,4.11,TAKEAWAY,,"List(List(458, Wine glass, 1644.0, 2, 3288.0))"
4014389,1595689270898,STR5494,POS353,OAS969,NONPRIME,8189067868,1894.0,1,CASH,1894.0,47.35,47.35,2.3675,TAKEAWAY,,"List(List(268, Floating shelf, 1894.0, 1, 1894.0))"
36524242,1595689270898,STR3781,POS129,OAS311,PRIME,4692642935,1955.0,1,CASH,1955.0,48.875,48.875,2.44375,HOME-DELIVERY,"List(8612 Non Rd., Guna, 8336980338, 210683, Madhya Pradesh)","List(List(643, Blow torch, 1955.0, 1, 1955.0))"
35058148,1595689270898,STR1534,POS135,OAS285,PRIME,5582740626,2944.0,3,CARD,2944.0,73.60000000000001,73.60000000000001,3.68,TAKEAWAY,,"List(List(668, Crab cracker, 785.0, 1, 785.0), List(658, Chinois, 567.0, 1, 567.0), List(633, Cafe Curtains, 796.0, 2, 1592.0))"
95067626,1595689270999,STR2629,POS172,OAS622,NONPRIME,7829975914,2297.0,2,CASH,2297.0,57.425,57.425,2.87125,HOME-DELIVERY,"List(7409 Laoreet Rd., Jammu, 8335722151, 697806, Jammu and Kashmir)","List(List(233, Coffee table, 1055.0, 1, 1055.0), List(408, Confidante, 1242.0, 1, 1242.0))"
17921881,1595689270999,STR1955,POS324,OAS183,NONPRIME,3916555911,773.0,1,CASH,773.0,19.325000000000003,19.325000000000003,0.96625,HOME-DELIVERY,"List(529-4520 Libero. Ave, Raigarh, 3057906681, 183678, Chhattisgarh)","List(List(398, Latex Mattress, 773.0, 1, 773.0))"
