In [0]:
pip install azure-storage-queue

Python interpreter will be restarted.
Collecting azure-storage-queue
  Downloading azure_storage_queue-12.6.0-py3-none-any.whl (170 kB)
Collecting isodate>=0.6.1
  Downloading isodate-0.6.1-py2.py3-none-any.whl (41 kB)
Collecting azure-core<2.0.0,>=1.26.0
  Downloading azure_core-1.28.0-py3-none-any.whl (185 kB)
Collecting typing-extensions>=4.0.1
  Downloading typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Installing collected packages: typing-extensions, isodate, azure-core, azure-storage-queue
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 4.1.1
    Not uninstalling typing-extensions at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-99058a41-7460-42f2-87b2-d2562ca0fddc
    Can't uninstall 'typing-extensions'. No files were found to uninstall.
Successfully installed azure-core-1.28.0 azure-storage-queue-12.6.0 isodate-0.6.1 typing-extensions-4.7.1
Python interpreter will b

In [0]:
from azure.storage.queue import QueueClient,BinaryBase64EncodePolicy,BinaryBase64DecodePolicy
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_date, col, to_timestamp
from pyspark.sql.types import DoubleType, IntegerType, DateType

In [0]:
conn_str = dbutils.secrets.get('secretkv', 'storageaccountconnstr')
user = dbutils.secrets.get('secretkv', 'db-user')
password = dbutils.secrets.get('secretkv', 'db-password')
jdbc_url = f'jdbc:sqlserver://jedrzejsmokdbserver.database.windows.net:1433;database=jedrzejsmok-poc;user={user}@jedrzejsmokdbserver;password={password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;'

In [0]:
def get_data_from_queue_generator(name):
    queue = QueueClient.from_connection_string(
        conn_str=conn_str, 
        queue_name=name, 
        message_encode_policy=BinaryBase64EncodePolicy(),
        message_decode_policy=BinaryBase64DecodePolicy())
    messages = queue.receive_messages(max_messages=32)
    for msg_batch in messages.by_page():
        for msg in msg_batch:
            data_str = msg.content.decode("utf8")
            #queue.delete_message(msg)
            lines = [line.split(',') for line in data_str.split("\r\n")]    
            df = spark.createDataFrame(data=lines[1:-1], schema=lines[0])
            yield df

In [0]:
dfs = [df for df in get_data_from_queue_generator("rainsqueue")]
df = reduce(DataFrame.unionAll, dfs)
df = df.dropDuplicates()
df = df \
    .withColumn('Name', col('Name'))\
    .withColumn('Lat', col('Lat').cast(DoubleType()))\
    .withColumn('Lng', col('Lng').cast(DoubleType()))\
    .withColumn('Date', to_date(col('Date'), 'yyyy-MM-dd'))\
    .withColumn('PR1M', col('PR1M').cast(DoubleType()))
    
df.schema
df.show()


+-------------------+--------+--------+----------+-----+
|               Name|     Lat|     Lng|      Date| PR1M|
+-------------------+--------+--------+----------+-----+
|     Boguslawskiego|51.10068|17.02814|2023-07-12|  0.0|
|              Dobra|51.16078|17.11884|2023-07-12|2.427|
|            Milicka|51.15266| 17.0425|2023-07-12|  0.0|
|            Slazowa|51.15252|16.99543|2023-07-12| null|
|         Bystrzycka| 51.1181|16.97265|2023-07-12| null|
|           Terenowa|51.07023|  17.046|2023-07-12| null|
|           Wedkarzy|  51.193|16.93361|2023-07-12| null|
|      Jagniatkowska|51.14771|16.90064|2023-07-12|  0.0|
|           Gitarowa|51.10417|17.15866|2023-07-12| null|
|Powstancow Slaskich|51.07853|17.00492|2023-07-12|  0.0|
+-------------------+--------+--------+----------+-----+



In [0]:
(df.write
  .format("jdbc")
  .option("url", jdbc_url)
  .option("dbtable", "rains_table")
  .mode("overwrite")
  .save()
)

In [0]:
dfs = [df for df in get_data_from_queue_generator("weatherqueue")]
df = reduce(DataFrame.unionAll, dfs)
df = df.dropDuplicates()
df = df \
    .withColumn('_id', col('_id').cast(IntegerType()))\
    .withColumn('Czas_Rejestracji', to_timestamp(col('Czas_Rejestracji'), 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('Wiatr_V', col('Wiatr_V').cast(DoubleType()))\
    .withColumn('Wiatr_Kierunek', col('Wiatr_Kierunek').cast(DoubleType()))\
    .withColumn('Wilgotnosc', col('Wilgotnosc').cast(DoubleType()))\
    .withColumn('T_Powietrza', col('T_Powietrza').cast(DoubleType()))\
    .withColumn('T_Grunt', col('T_Grunt').cast(DoubleType()))\
    .withColumn('Opad_Typ', col('Opad_Typ'))\
    .withColumn('Lokalizacja_Opis', col('Lokalizacja_Opis'))
    
df.schema
df.show()

+---+-------------------+-------+--------------+----------+-----------+-------+----------+--------------------+
|_id|   Czas_Rejestracji|Wiatr_V|Wiatr_Kierunek|Wilgotnosc|T_Powietrza|T_Grunt|  Opad_Typ|    Lokalizacja_Opis|
+---+-------------------+-------+--------------+----------+-----------+-------+----------+--------------------+
|  2|2022-10-06 09:40:00|    0.0|         246.0|     71.88|      17.77|   null|Brak opadu|MOST ROMANA DMOWS...|
|  4|2022-10-06 09:40:00|    2.0|         270.0|       5.2|       17.9|   14.4|Brak opadu|UL. OPOLSKA / UL....|
|  6|2022-10-06 09:40:00|   1.81|         196.0|     66.91|      17.17|  18.16|Brak opadu| ESTAKADA GADOWIANKA|
|  8|2022-10-06 09:40:00|    2.5|          69.0|      66.7|       19.7|   18.2|Brak opadu|     MOST WARSZAWSKI|
|  7|2022-10-06 09:40:00|    0.0|         215.0|      65.8|      17.12|  17.32|Brak opadu|      MOST MILENIJNY|
|  1|2022-10-06 09:40:00|    1.2|         237.0|      78.6|       16.7|   19.6|Brak opadu|UL. LOTNICZA /

In [0]:
(df.write
  .format("jdbc")
  .option("url", jdbc_url)
  .option("dbtable", "weather_table")
  .mode("overwrite")
  .save()
)

In [0]:
dfs = [df for df in get_data_from_queue_generator("parkingqueue")]
df = reduce(DataFrame.unionAll, dfs)
df = df.dropDuplicates()
df = df \
    .withColumn('_id', col('_id').cast(IntegerType()))\
    .withColumn('Czas_Rejestracji', to_timestamp(col('Czas_Rejestracji'), 'yyyy-MM-dd HH:mm:ss.SSSSSS'))\
    .withColumn('Liczba_Wolnych_Miejsc', col('Liczba_Wolnych_Miejsc').cast(IntegerType()))\
    .withColumn('Liczba_Poj_Wjezdzajacych', col('Liczba_Poj_Wjezdzajacych').cast(IntegerType()))\
    .withColumn('Liczba_Poj_Wyjezdzajacych', col('Liczba_Poj_Wyjezdzajacych').cast(IntegerType()))\
    .withColumn('Nazwa', col('Nazwa'))
    
df.schema
df.show()

+---+--------------------+---------------------+------------------------+-------------------------+--------------------+
|_id|    Czas_Rejestracji|Liczba_Wolnych_Miejsc|Liczba_Poj_Wjezdzajacych|Liczba_Poj_Wyjezdzajacych|               Nazwa|
+---+--------------------+---------------------+------------------------+-------------------------+--------------------+
|  1|2023-07-14 14:10:...|                 -383|                       0|                        1|PR11 - ul. ÅlÄÅ...|
|  2|2023-07-14 11:12:...|                   38|                       1|                        0|PR05 - ul. Bardzk...|
|  7|2023-07-14 14:13:...|                -3148|                       0|                        0|PR15 - ul. Kamien...|
|  8|2023-07-14 13:07:...|                   52|                       1|                        0|PR03B - Grabiszyn...|
| 13|2022-08-01 12:15:...|                  198|                       6|                        6|           Nowy Targ|
| 14|2023-04-03 08:22:...|      

In [0]:
(df.write
  .format("jdbc")
  .option("url", jdbc_url)
  .option("dbtable", "parking_table")
  .mode("overwrite")
  .save()
)

In [0]:
dfs = [df for df in get_data_from_queue_generator("arrivedqueue")]
df = reduce(DataFrame.unionAll, dfs)
df = df.dropDuplicates()
df = df \
    .withColumn('_id', col('_id').cast(IntegerType()))\
    .withColumn('DATA', to_timestamp(col('DATA'), 'yyyy-MM-dd'))\
    .withColumn('GMINA', col('GMINA'))\
    .withColumn('LICZBA_POJAZDOW', col('LICZBA_POJAZDOW').cast(IntegerType()))
    
df.schema
df.show()

+---+-------------------+---------------+---------------+
|_id|               DATA|          GMINA|LICZBA_POJAZDOW|
+---+-------------------+---------------+---------------+
|  6|2022-09-26 00:00:00|           Inne|          28830|
|  5|2022-09-26 00:00:00|   GÃ³ra powiat|            228|
|  4|2022-09-26 00:00:00|          GÃ³ra|            204|
| 14|2022-09-26 00:00:00|          Lubin|            645|
| 15|2022-09-26 00:00:00|        LwÃ³wek|            163|
| 13|2022-09-26 00:00:00|         LubaÅ|            178|
| 23|2022-09-26 00:00:00|      Trzebnica|           3482|
| 22|2022-09-26 00:00:00|      Åwidnica|           1171|
| 24|2022-09-26 00:00:00|     WaÅbrzych|            607|
| 19|2022-09-26 00:00:00|      Polkowice|            206|
| 21|2022-09-26 00:00:00|Åroda ÅlÄska|           1699|
| 20|2022-09-26 00:00:00|       Strzelin|            516|
| 28|2022-09-26 00:00:00|WrocÅaw powiat|           9074|
| 31|2022-09-26 00:00:00|     ZÅotoryja|            186|
| 30|2022-09-2

In [0]:
(df.write
  .format("jdbc")
  .option("url", jdbc_url)
  .option("dbtable", "arrived_table")
  .mode("overwrite")
  .save()
)