In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables")

Out[22]: [FileInfo(path='dbfs:/FileStore/tables/Employee.csv', name='Employee.csv', size=84229, modificationTime=1685525564000)]

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/tables/datasets/laptop_source_stream")

Out[29]: True

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

custom_schema = StructType(
    [
        StructField("Employee_id", IntegerType(),True),
        StructField("First_name", StringType(),True),
        StructField("Last_name", StringType(),True),
        StructField("Gender", StringType(), True),
        StructField("salary", IntegerType(), True),
        StructField("Date_of_Birth", StringType(),True),
        StructField("Age", IntegerType(),True),
        StructField("Country", StringType(),True),
        StructField("Department_id", IntegerType(), True),
        StructField("Date_of_Joining", StringType(), True),
        StructField("Manager_id", IntegerType(), True),
        StructField("Currency", StringType(), True),
        StructField("End_Date", StringType(), True),
    ]
)  
employee_df = spark.read.format("csv")\
    .option("header", "true")\
    .schema(custom_schema)\
    .load("/FileStore/tables/Employee.csv")

employee_df.printSchema()

root
 |-- Employee_id: integer (nullable = true)
 |-- First_name: string (nullable = true)
 |-- Last_name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- Date_of_Birth: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Department_id: integer (nullable = true)
 |-- Date_of_Joining: string (nullable = true)
 |-- Manager_id: integer (nullable = true)
 |-- Currency: string (nullable = true)
 |-- End_Date: string (nullable = true)



In [0]:
employee_df.isStreaming

Out[25]: False

In [0]:
employee_stream_df = spark.readStream.format("csv")\
    .option("header", "true")\
    .schema(custom_schema)\
    .load("/FileStore/tables/datasets/laptop_source_stream/")

employee_stream_df.isStreaming

Out[31]: True

In [0]:
employee_stream_df.display()

Employee_id,First_name,Last_name,Gender,salary,Date_of_Birth,Age,Country,Department_id,Date_of_Joining,Manager_id,Currency,End_Date
1379,Klarrisa,Caile,Female,123945,08/09/1980,33,Australia,4,13/05/2003,4,USD,17/08/2006
1144,Rosemaria,Shieldon,Female,105777,16/08/1987,25,Australia,19,09/06/2000,6,USD,09/12/2011
1023,Biddie,Paolone,Male,113582,17/10/1981,36,UAE,2,07/06/2007,10,USD,17/11/2018
1437,Abner,Ellerington,Male,112349,24/08/1990,38,Germay,5,23/02/2011,1,USD,28/11/2018
1247,Nils,Ilyinski,Female,111907,27/03/1986,29,Canada,4,27/05/2003,3,USD,20/05/2006
1128,Patrizius,Sanpher,Female,146209,08/02/1995,65,UAE,12,20/01/2010,1,USD,17/08/2012
1373,Lindy,Copestick,Female,104192,10/08/1986,50,Sweden,1,15/05/2011,2,USD,13/05/2016
1442,Allegra,Syseland,Female,100917,14/04/1998,63,Denmark,7,05/10/2010,6,USD,25/05/2010
1042,Trude,Phippen,Male,131623,30/05/1991,64,Sweden,2,12/10/2004,4,USD,12/09/2013
1324,Sidoney,Kenzie,Female,132287,28/09/1987,38,Denmark,9,21/04/2003,8,USD,15/03/2006


In [0]:
from pyspark.sql.functions import col 

@udf
def replace_slash(string):
    return string.replace("/", "-")


employee_stream_df_updated = employee_stream_df.withColumn(
        "Date_of_Birth",
        replace_slash(employee_stream_df["Date_of_Birth"])
    )
employee_stream_df_updated.display()

Employee_id,First_name,Last_name,Gender,salary,Date_of_Birth,Age,Country,Department_id,Date_of_Joining,Manager_id,Currency,End_Date
1379,Klarrisa,Caile,Female,123945,08-09-1980,33,Australia,4,13/05/2003,4,USD,17/08/2006
1144,Rosemaria,Shieldon,Female,105777,16-08-1987,25,Australia,19,09/06/2000,6,USD,09/12/2011
1023,Biddie,Paolone,Male,113582,17-10-1981,36,UAE,2,07/06/2007,10,USD,17/11/2018
1437,Abner,Ellerington,Male,112349,24-08-1990,38,Germay,5,23/02/2011,1,USD,28/11/2018
1247,Nils,Ilyinski,Female,111907,27-03-1986,29,Canada,4,27/05/2003,3,USD,20/05/2006
1128,Patrizius,Sanpher,Female,146209,08-02-1995,65,UAE,12,20/01/2010,1,USD,17/08/2012
1373,Lindy,Copestick,Female,104192,10-08-1986,50,Sweden,1,15/05/2011,2,USD,13/05/2016
1442,Allegra,Syseland,Female,100917,14-04-1998,63,Denmark,7,05/10/2010,6,USD,25/05/2010
1042,Trude,Phippen,Male,131623,30-05-1991,64,Sweden,2,12/10/2004,4,USD,12/09/2013
1324,Sidoney,Kenzie,Female,132287,28-09-1987,38,Denmark,9,21/04/2003,8,USD,15/03/2006


In [0]:
# write stream method to write data to a table in memory
df.writeStream \
    .format("memory")\
    .queryName("premium laptops 20")\
    .trigger(processingTime="20 seconds")
    .start()

spark.sql("select company, count(*) from premium laptops 20 group by department_id ")

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-399819407147986>:1[0m
[0;32m----> 1[0m [38;5;28;43mlen[39;49m[43m([49m[43memployee_stream_df[49m[43m)[49m

[0;31mTypeError[0m: object of type 'DataFrame' has no len()

### streaming data using Autoloader

In [0]:
employee_stream_data = spark.readStream.format("cloudFiles")\
    .option("cloudFiles.format", "csv")\
    .option("cloudFiles.schemalocation",
            "dbfs:/FileStore/tables/datasets/laptop_source_stream")\
    # .option("cloudFiles.schemaHints", "Employee_id int, First_Name str, Last_Name str, Gender str, etc")
    .load("dbfs:/FileStore/tables/datasets/laptop_source_stream")

In [0]:
employee_stream_data.display()

Employee_id,First_Name,Last_Name,Gender,Salary,Date_of_Birth,Age,Country,Department_id,Date_of_Joining,Manager_id,Currency,End_Date,_rescued_data
1379,Klarrisa,Caile,Female,123945,08/09/1980,33,Australia,4,13/05/2003,4,USD,17/08/2006,
1144,Rosemaria,Shieldon,Female,105777,16/08/1987,25,Australia,19,09/06/2000,6,USD,09/12/2011,
1023,Biddie,Paolone,Male,113582,17/10/1981,36,UAE,2,07/06/2007,10,USD,17/11/2018,
1437,Abner,Ellerington,Male,112349,24/08/1990,38,Germay,5,23/02/2011,1,USD,28/11/2018,
1247,Nils,Ilyinski,Female,111907,27/03/1986,29,Canada,4,27/05/2003,3,USD,20/05/2006,
1128,Patrizius,Sanpher,Female,146209,08/02/1995,65,UAE,12,20/01/2010,1,USD,17/08/2012,
1373,Lindy,Copestick,Female,104192,10/08/1986,50,Sweden,1,15/05/2011,2,USD,13/05/2016,
1442,Allegra,Syseland,Female,100917,14/04/1998,63,Denmark,7,05/10/2010,6,USD,25/05/2010,
1042,Trude,Phippen,Male,131623,30/05/1991,64,Sweden,2,12/10/2004,4,USD,12/09/2013,
1324,Sidoney,Kenzie,Female,132287,28/09/1987,38,Denmark,9,21/04/2003,8,USD,15/03/2006,


In [0]:
employee_stream_subset = employee_stream_data.filter(employee_stream_data.Country == "Australia")\
    .select("Employee_id", "First_Name", "Last_Name", "Gender", "Salary", "Country")

In [0]:
employee_stream_subset.display()

Employee_id,First_Name,Last_Name,Gender,Salary,Country
1379,Klarrisa,Caile,Female,123945,Australia
1144,Rosemaria,Shieldon,Female,105777,Australia
1150,Gilda,Carlino,Female,136335,Australia
1385,Luci,Layne,Male,119403,Australia
1364,Simmonds,Denyukhin,Female,138014,Australia
1160,Reagan,Wisson,Male,104618,Australia
1053,Yardley,Brawn,Male,82019,Australia
1448,Erik,Scottini,Male,97371,Australia
1457,Sarge,Garmon,Female,139781,Australia
1188,Farley,McGinnell,Female,91986,Australia


In [0]:
# dbutils.fs.ls("dbfs:/FileStore/tables/datasets/laptop_source_stream")
# dbutils.fs.rm("dbfs:/FileStore/tables/datasets/laptop_source_stream/SourceA_01122020.csv")
# dbutils.fs.head("dbfs:/FileStore/tables/datasets/laptop_source_stream/_schemas/0")
# dbutils.fs.mkdirs("dbfs:/FileStore/tables/datasets/dest_location")
dbutils.fs.ls("dbfs:/FileStore/tables/datasets/dest_location/")

Out[43]: [FileInfo(path='dbfs:/FileStore/tables/datasets/dest_location/_spark_metadata/', name='_spark_metadata/', size=0, modificationTime=1685618855225),
 FileInfo(path='dbfs:/FileStore/tables/datasets/dest_location/checkpoint_1/', name='checkpoint_1/', size=0, modificationTime=1685618855225),
 FileInfo(path='dbfs:/FileStore/tables/datasets/dest_location/part-00000-0a4c2ed0-23ed-4949-bdc2-c9e8ec551287-c000.csv', name='part-00000-0a4c2ed0-23ed-4949-bdc2-c9e8ec551287-c000.csv', size=1797, modificationTime=1685617986000),
 FileInfo(path='dbfs:/FileStore/tables/datasets/dest_location/part-00000-b77544a1-ae8c-47ec-8aa0-ad5135fd3b51-c000.csv', name='part-00000-b77544a1-ae8c-47ec-8aa0-ad5135fd3b51-c000.csv', size=2236, modificationTime=1685618668000),
 FileInfo(path='dbfs:/FileStore/tables/datasets/dest_location/part-00001-33dc6716-f5a8-4f11-932e-56c54450d755-c000.csv', name='part-00001-33dc6716-f5a8-4f11-932e-56c54450d755-c000.csv', size=2366, modificationTime=1685617985000),
 FileInfo(pat

In [0]:
employee_stream_subset.writeStream \
    .option("mergeSchema", "true")\
    .format("csv")\
    .option("checkpointLocation",
            "dbfs:/FileStore/tables/datasets/dest_location/checkpoint_1")\
    .start("dbfs:/FileStore/tables/datasets/dest_location/")

Out[37]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f18df36b580>

Out[22]: 'v1\n{"dataSchemaJson":"{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"Employee_id\\",\\"type\\":\\"integer\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"First_Name\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Last_Name\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Gender\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Salary\\",\\"type\\":\\"integer\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Date_of_Birth\\",\\"type\\":\\"date\\",\\"nullable\\":true,\\"metadata\\":{\\"__detected_date_formats\\":\\"d/M/yyyy\\"}},{\\"name\\":\\"Age\\",\\"type\\":\\"integer\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Country\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Department_id\\",\\"type\\":\\"integer\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Date_of_Joining\\",\\"type\\":\\"date\\",\\"nullabl