# Spark Structured Streaming Project

Project contains Apache Spark Streaming. Apache Spark write streams to postgre sql, apache hive and deltalake with processing read stream dataframe.

## Streaming CSV to Folder

In [17]:
# python dataframe_to_log.py -idx True -i input/iot_telemetry_data.csv

As the above, run python file at console. This provides streaming csv to folder.

## Import Libraries & SparkSession

In [1]:
from pyspark.sql import SparkSession, functions as F

import findspark
from spark_utils import *

findspark.init("C:\Program Files\Spark\spark-3.3.1-bin-hadoop3")

In [2]:
spark = (SparkSession.builder
        .master("yarn")
        .appName("Spark Streaming with Multiple Sink")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        .config("spark.sql.adaptive.enabled", True)
        .enableHiveSupport()
        .getOrCreate())

Create spark session with deltalake and apache hive configurations. After that import to delta library.

In [3]:
from delta.tables import *

## IOT Schema

In [4]:
iot_schema = "row_id int, " \
    "ts double, " \
    "device string, " \
    "co float, " \
    "humidity float, " \
    "light boolean, " \
    "lpg float, " \
    "motion boolean, " \
    "smoke float, " \
    "temp float, " \
    "time timestamp"

Make schema for dataframe on the above because spark can read data types wrongly in order that set manually to schema.

In [4]:
file_path = "file:///Users/talha/OneDrive/Masaüstü/Talha Nebi Kumru/Data Enginnering/Miuul/RealTimeDPWithSpark/Spark_Streaming_with_Multiple_Sink/output"
delta_path = "/user/talha/datasets/delta-stream"

## Read Stream

In [6]:
stream_df = (spark.readStream
            .format("csv")
            .schema(iot_schema)
            .option("header", True)
            .option("maxFilesPerTrigger", 1)
            .load(file_path))

## Multiple Sink

In [8]:
def write_multiple(df, batchID):
    df.persist()
    
    print(df.limit(5).toPandas().head())
    
    # Write to PostgreSQL
    write_psql(df.filter(" device == '00:0f:00:70:91:0a' "), 
               table_name="sensor_0a", 
               password="password")
    
    # Write to Apache Hive
    write_hive(df.filter(" device == 'b8:27:eb:bf:9d:51' "), 
               mode="append", 
               table_name="sensor_51")
    
    # Write to Deltalake
    write_deltalake(df.filter(" device == '1c:bf:ce:15:ec:4d' "), 
                    mode="append", 
                    path=delta_path)
    
    df.unpersist()

This function provides appending dataframe to psql, hive and deltalake.

## Write Stream

In [9]:
streaming_query = (
    stream_df.writeStream
        .foreachBatch(write_multiple)
        .option("checkpointLocation", "checkpoint")
        .start()
)
streaming_query.awaitTermination(100)

  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0       1  1.594512e+09  ...  19.700001 2023-02-13 01:56:59.308220
1       2  1.594512e+09  ...  22.600000 2023-02-13 01:56:59.823688
2       3  1.594512e+09  ...  27.000000 2023-02-13 01:57:00.337171
3       4  1.594512e+09  ...  22.600000 2023-02-13 01:57:00.851784
4       5  1.594512e+09  ...  27.000000 2023-02-13 01:57:01.359949

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      11  1.594512e+09  ...  27.000000 2023-02-13 01:57:04.426730
1      12  1.594512e+09  ...  22.600000 2023-02-13 01:57:04.939046
2      13  1.594512e+09  ...  27.000000 2023-02-13 01:57:05.440151
3      14  1.594512e+09  ...  22.600000 2023-02-13 01:57:05.955877
4      15  1.594512e+09  ...  19.700001 2023-02-13 01:57:06.468947

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      21  1.594512e+09  ...  27.000000 2023-02-13 01:57:09.527628
1      22  1.594512e+09  ...  22.600000 2023-02-13 01:57:10.028097
2      23  1.594512e+09  ...  19.700001 2023-02-13 01:57:10.538998
3      24  1.594512e+09  ...  22.600000 2023-02-13 01:57:11.053585
4      25  1.594512e+09  ...  19.799999 2023-02-13 01:57:11.567682

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      31  1.594512e+09  ...  22.600000 2023-02-13 01:57:14.615167
1      32  1.594512e+09  ...  27.000000 2023-02-13 01:57:15.126961
2      33  1.594512e+09  ...  19.799999 2023-02-13 01:57:15.639922
3      34  1.594512e+09  ...  22.600000 2023-02-13 01:57:16.155052
4      35  1.594512e+09  ...  22.700001 2023-02-13 01:57:16.668001

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      41  1.594512e+09  ...  19.700001 2023-02-13 01:57:19.750844
1      42  1.594512e+09  ...  27.000000 2023-02-13 01:57:20.266230
2      43  1.594512e+09  ...  22.600000 2023-02-13 01:57:20.779360
3      44  1.594512e+09  ...  22.600000 2023-02-13 01:57:21.294096
4      45  1.594512e+09  ...  27.000000 2023-02-13 01:57:21.806657

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      51  1.594512e+09  ...  22.600000 2023-02-13 01:57:24.883070
1      52  1.594512e+09  ...  27.000000 2023-02-13 01:57:25.397667
2      53  1.594512e+09  ...  19.799999 2023-02-13 01:57:25.910995
3      54  1.594512e+09  ...  22.600000 2023-02-13 01:57:26.424946
4      55  1.594512e+09  ...  27.000000 2023-02-13 01:57:26.937405

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      61  1.594512e+09  ...  27.000000 2023-02-13 01:57:29.993070
1      62  1.594512e+09  ...  19.799999 2023-02-13 01:57:30.507226
2      63  1.594512e+09  ...  22.700001 2023-02-13 01:57:31.016322
3      64  1.594512e+09  ...  22.700001 2023-02-13 01:57:31.528167
4      65  1.594512e+09  ...  19.700001 2023-02-13 01:57:32.042472

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      71  1.594512e+09  ...  19.799999 2023-02-13 01:57:35.094386
1      72  1.594512e+09  ...  22.600000 2023-02-13 01:57:35.608176
2      73  1.594512e+09  ...  27.000000 2023-02-13 01:57:36.118848
3      74  1.594512e+09  ...  22.600000 2023-02-13 01:57:36.632284
4      75  1.594512e+09  ...  27.000000 2023-02-13 01:57:37.133170

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      81  1.594512e+09  ...  22.600000 2023-02-13 01:57:40.193592
1      82  1.594512e+09  ...  19.700001 2023-02-13 01:57:40.706470
2      83  1.594512e+09  ...  27.000000 2023-02-13 01:57:41.217995
3      84  1.594512e+09  ...  22.600000 2023-02-13 01:57:41.731138
4      85  1.594512e+09  ...  22.600000 2023-02-13 01:57:42.246393

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0      91  1.594512e+09  ...  27.000000 2023-02-13 01:57:45.307237
1      92  1.594512e+09  ...  19.799999 2023-02-13 01:57:45.820802
2      93  1.594512e+09  ...  22.700001 2023-02-13 01:57:46.320929
3      94  1.594512e+09  ...  19.700001 2023-02-13 01:57:46.836034
4      95  1.594512e+09  ...  22.700001 2023-02-13 01:57:47.348987

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     101  1.594512e+09  ...  19.700001 2023-02-13 01:57:50.416871
1     102  1.594512e+09  ...  22.700001 2023-02-13 01:57:50.931802
2     103  1.594512e+09  ...  22.600000 2023-02-13 01:57:51.443802
3     104  1.594512e+09  ...  27.000000 2023-02-13 01:57:51.944431
4     105  1.594512e+09  ...  19.700001 2023-02-13 01:57:52.454405

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     111  1.594512e+09  ...  27.000000 2023-02-13 01:57:55.529672
1     112  1.594512e+09  ...  22.600000 2023-02-13 01:57:56.041497
2     113  1.594512e+09  ...  19.700001 2023-02-13 01:57:56.554181
3     114  1.594512e+09  ...  22.600000 2023-02-13 01:57:57.054437
4     115  1.594512e+09  ...  27.000000 2023-02-13 01:57:57.568609

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     121  1.594512e+09  ...  19.799999 2023-02-13 01:58:00.621200
1     122  1.594512e+09  ...  27.000000 2023-02-13 01:58:01.132823
2     123  1.594512e+09  ...  22.600000 2023-02-13 01:58:01.647252
3     124  1.594512e+09  ...  19.700001 2023-02-13 01:58:02.162162
4     125  1.594512e+09  ...  22.600000 2023-02-13 01:58:02.671364

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     131  1.594512e+09  ...  22.600000 2023-02-13 01:58:05.738884
1     132  1.594512e+09  ...  19.799999 2023-02-13 01:58:06.250179
2     133  1.594512e+09  ...  22.600000 2023-02-13 01:58:06.755026
3     134  1.594512e+09  ...  19.700001 2023-02-13 01:58:07.267582
4     135  1.594512e+09  ...  22.700001 2023-02-13 01:58:07.782948

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     141  1.594512e+09  ...  22.700001 2023-02-13 01:58:10.852953
1     142  1.594512e+09  ...  22.700001 2023-02-13 01:58:11.366826
2     143  1.594512e+09  ...  19.700001 2023-02-13 01:58:11.879149
3     144  1.594512e+09  ...  27.000000 2023-02-13 01:58:12.389185
4     145  1.594512e+09  ...  22.700001 2023-02-13 01:58:12.900097

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     151  1.594512e+09  ...  22.700001 2023-02-13 01:58:15.984130
1     152  1.594512e+09  ...  19.700001 2023-02-13 01:58:16.494538
2     153  1.594512e+09  ...  22.700001 2023-02-13 01:58:16.995272
3     154  1.594512e+09  ...  27.000000 2023-02-13 01:58:17.506689
4     155  1.594512e+09  ...  19.799999 2023-02-13 01:58:18.017368

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     161  1.594512e+09  ...  19.700001 2023-02-13 01:58:21.085332
1     162  1.594512e+09  ...  27.000000 2023-02-13 01:58:21.596877
2     163  1.594512e+09  ...  22.600000 2023-02-13 01:58:22.098013
3     164  1.594512e+09  ...  19.700001 2023-02-13 01:58:22.608188
4     165  1.594512e+09  ...  27.000000 2023-02-13 01:58:23.117833

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     171  1.594512e+09  ...  27.000000 2023-02-13 01:58:26.179605
1     172  1.594512e+09  ...  19.799999 2023-02-13 01:58:26.692472
2     173  1.594512e+09  ...  22.700001 2023-02-13 01:58:27.193792
3     174  1.594512e+09  ...  27.000000 2023-02-13 01:58:27.705817
4     175  1.594512e+09  ...  22.700001 2023-02-13 01:58:28.214685

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     181  1.594512e+09  ...  27.000000 2023-02-13 01:58:31.294593
1     182  1.594512e+09  ...  19.700001 2023-02-13 01:58:31.807274
2     183  1.594512e+09  ...  22.700001 2023-02-13 01:58:32.321794
3     184  1.594512e+09  ...  27.100000 2023-02-13 01:58:32.832097
4     185  1.594512e+09  ...  22.700001 2023-02-13 01:58:33.343868

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     191  1.594512e+09  ...  27.000000 2023-02-13 01:58:36.398675
1     192  1.594512e+09  ...  22.700001 2023-02-13 01:58:36.914082
2     193  1.594512e+09  ...  27.100000 2023-02-13 01:58:37.427931
3     194  1.594512e+09  ...  22.700001 2023-02-13 01:58:37.942830
4     195  1.594512e+09  ...  22.700001 2023-02-13 01:58:38.457325

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     201  1.594512e+09  ...  19.799999 2023-02-13 01:58:41.539502
1     202  1.594512e+09  ...  22.700001 2023-02-13 01:58:42.050761
2     203  1.594512e+09  ...  19.799999 2023-02-13 01:58:42.564976
3     204  1.594512e+09  ...  22.700001 2023-02-13 01:58:43.077864
4     205  1.594512e+09  ...  22.700001 2023-02-13 01:58:43.591030

[5 rows x 11 columns]


False

  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     211  1.594512e+09  ...  27.000000 2023-02-13 01:58:46.660827
1     212  1.594512e+09  ...  22.700001 2023-02-13 01:58:47.173654
2     213  1.594512e+09  ...  19.799999 2023-02-13 01:58:47.682781
3     214  1.594512e+09  ...  27.000000 2023-02-13 01:58:48.194739
4     215  1.594512e+09  ...  22.700001 2023-02-13 01:58:48.705845

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     221  1.594512e+09  ...  22.600000 2023-02-13 01:58:51.788264
1     222  1.594512e+09  ...  27.000000 2023-02-13 01:58:52.303511
2     223  1.594512e+09  ...  22.600000 2023-02-13 01:58:52.804484
3     224  1.594512e+09  ...  19.799999 2023-02-13 01:58:53.319255
4     225  1.594512e+09  ...  22.600000 2023-02-13 01:58:53.831008

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     231  1.594512e+09  ...  27.000000 2023-02-13 01:58:56.903922
1     232  1.594512e+09  ...  22.600000 2023-02-13 01:58:57.419457
2     233  1.594512e+09  ...  19.799999 2023-02-13 01:58:57.955926
3     234  1.594512e+09  ...  22.700001 2023-02-13 01:58:58.462332
4     235  1.594512e+09  ...  27.000000 2023-02-13 01:58:58.974661

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     241  1.594513e+09  ...  19.799999 2023-02-13 01:59:02.045940
1     242  1.594513e+09  ...  22.600000 2023-02-13 01:59:02.558266
2     243  1.594513e+09  ...  19.799999 2023-02-13 01:59:03.070681
3     244  1.594513e+09  ...  27.000000 2023-02-13 01:59:03.586029
4     245  1.594513e+09  ...  22.600000 2023-02-13 01:59:04.095504

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     251  1.594513e+09  ...  27.000000 2023-02-13 01:59:07.171942
1     252  1.594513e+09  ...  22.600000 2023-02-13 01:59:07.686019
2     253  1.594513e+09  ...  19.700001 2023-02-13 01:59:08.201289
3     254  1.594513e+09  ...  22.600000 2023-02-13 01:59:08.713053
4     255  1.594513e+09  ...  22.600000 2023-02-13 01:59:09.227627

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     261  1.594513e+09  ...  19.700001 2023-02-13 01:59:12.313067
1     262  1.594513e+09  ...  22.600000 2023-02-13 01:59:12.827166
2     263  1.594513e+09  ...  27.000000 2023-02-13 01:59:13.337243
3     264  1.594513e+09  ...  19.700001 2023-02-13 01:59:13.837252
4     265  1.594513e+09  ...  22.600000 2023-02-13 01:59:14.340709

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     271  1.594513e+09  ...  19.700001 2023-02-13 01:59:17.399635
1     272  1.594513e+09  ...  22.600000 2023-02-13 01:59:17.900875
2     273  1.594513e+09  ...  27.000000 2023-02-13 01:59:18.413638
3     274  1.594513e+09  ...  22.600000 2023-02-13 01:59:18.928033
4     275  1.594513e+09  ...  19.700001 2023-02-13 01:59:19.446299

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     281  1.594513e+09  ...  22.600000 2023-02-13 01:59:22.531427
1     282  1.594513e+09  ...  19.700001 2023-02-13 01:59:23.044938
2     283  1.594513e+09  ...  27.000000 2023-02-13 01:59:23.554542
3     284  1.594513e+09  ...  22.600000 2023-02-13 01:59:24.064710
4     285  1.594513e+09  ...  19.700001 2023-02-13 01:59:24.578465

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     291  1.594513e+09  ...  19.700001 2023-02-13 01:59:27.644425
1     292  1.594513e+09  ...  27.000000 2023-02-13 01:59:28.157521
2     293  1.594513e+09  ...  22.500000 2023-02-13 01:59:28.672659
3     294  1.594513e+09  ...  22.500000 2023-02-13 01:59:29.185144
4     295  1.594513e+09  ...  27.000000 2023-02-13 01:59:29.700028

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     301  1.594513e+09  ...  19.700001 2023-02-13 01:59:32.749898
1     302  1.594513e+09  ...  22.500000 2023-02-13 01:59:33.262008
2     303  1.594513e+09  ...  27.000000 2023-02-13 01:59:33.763576
3     304  1.594513e+09  ...  22.500000 2023-02-13 01:59:34.277095
4     305  1.594513e+09  ...  27.000000 2023-02-13 01:59:34.791901

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     311  1.594513e+09  ...  19.700001 2023-02-13 01:59:37.869220
1     312  1.594513e+09  ...  27.000000 2023-02-13 01:59:38.384578
2     313  1.594513e+09  ...  22.400000 2023-02-13 01:59:38.897140
3     314  1.594513e+09  ...  19.700001 2023-02-13 01:59:39.398219
4     315  1.594513e+09  ...  22.500000 2023-02-13 01:59:39.912922

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     321  1.594513e+09  ...  19.700001 2023-02-13 01:59:42.997403
1     322  1.594513e+09  ...  27.000000 2023-02-13 01:59:43.512951
2     323  1.594513e+09  ...  22.500000 2023-02-13 01:59:44.013312
3     324  1.594513e+09  ...  22.400000 2023-02-13 01:59:44.527875
4     325  1.594513e+09  ...  19.700001 2023-02-13 01:59:45.029441

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...       temp                       time
0     331  1.594513e+09  ...  27.000000 2023-02-13 01:59:48.094689
1     332  1.594513e+09  ...  22.400000 2023-02-13 01:59:48.597005
2     333  1.594513e+09  ...  19.700001 2023-02-13 01:59:49.112011
3     334  1.594513e+09  ...  22.400000 2023-02-13 01:59:49.625096
4     335  1.594513e+09  ...  27.000000 2023-02-13 01:59:50.140641

[5 rows x 11 columns]


  series = series.astype(t, copy=False)


   row_id            ts  ...  temp                       time
0     341  1.594513e+09  ...  22.4 2023-02-13 01:59:53.193341
1     342  1.594513e+09  ...  22.4 2023-02-13 01:59:53.705529
2     343  1.594513e+09  ...  27.0 2023-02-13 01:59:54.218743
3     344  1.594513e+09  ...  22.4 2023-02-13 01:59:54.730727
4     345  1.594513e+09  ...  19.6 2023-02-13 01:59:55.244762

[5 rows x 11 columns]


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\talha\anaconda3\lib\site-packages\py4j\clientserver.py", line 467, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\talha\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\talha\anaconda3\lib\site-packages\py4j\clientserver.py", line 470, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "C:\Users\talha\anaconda3\lib\site-packages\py4j\clientserver.py", line 581, in _call_proxy
   

## Control

Control to is saved data in psql, hive and deltalake.

In [13]:
psql_df = read_psql(spark, table_name="sensor_0a", password="password")

In [15]:
psql_df.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,row_id,ts,device,co,humidity,light,lpg,motion,smoke,temp,time
0,1,1594512000.0,00:0f:00:70:91:0a,0.00284,76.0,False,0.005114,False,0.013275,19.700001,2023-02-13 01:56:59.308220
1,7,1594512000.0,00:0f:00:70:91:0a,0.002938,76.0,False,0.005241,False,0.013628,19.700001,2023-02-13 01:57:02.388916
2,15,1594512000.0,00:0f:00:70:91:0a,0.002905,75.800003,False,0.005199,False,0.013509,19.700001,2023-02-13 01:57:06.468947
3,23,1594512000.0,00:0f:00:70:91:0a,0.00284,76.0,False,0.005114,False,0.013275,19.700001,2023-02-13 01:57:10.538998
4,25,1594512000.0,00:0f:00:70:91:0a,0.00284,76.0,False,0.005114,False,0.013275,19.799999,2023-02-13 01:57:11.567682


In [9]:
hive_df = read_hive(spark, db_name="test", table_name="sensor_51")

In [12]:
hive_df.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,row_id,ts,device,co,humidity,light,lpg,motion,smoke,temp,time
0,341,1594513000.0,b8:27:eb:bf:9d:51,0.004973,49.200001,False,0.00767,False,0.020466,22.4,2023-02-13 01:59:53.193341
1,342,1594513000.0,b8:27:eb:bf:9d:51,0.004988,49.200001,False,0.007686,False,0.020512,22.4,2023-02-13 01:59:53.705529
2,344,1594513000.0,b8:27:eb:bf:9d:51,0.004972,49.299999,False,0.007668,False,0.020461,22.4,2023-02-13 01:59:54.730727
3,346,1594513000.0,b8:27:eb:bf:9d:51,0.004988,49.200001,False,0.007686,False,0.020512,22.299999,2023-02-13 01:59:55.758815
4,348,1594513000.0,b8:27:eb:bf:9d:51,0.004982,49.299999,False,0.00768,False,0.020493,22.4,2023-02-13 01:59:56.782545


In [5]:
delta_df = read_delta(spark, delta_path)

In [7]:
delta_df.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,row_id,ts,device,co,humidity,light,lpg,motion,smoke,temp,time
0,343,1594513000.0,1c:bf:ce:15:ec:4d,0.004342,77.599998,True,0.006952,False,0.018427,27.0,2023-02-13 01:59:54.218743
1,347,1594513000.0,1c:bf:ce:15:ec:4d,0.004291,77.599998,True,0.006894,False,0.018261,27.0,2023-02-13 01:59:56.273850


## Stop

In [16]:
spark.stop()