In [0]:

from datetime import timedelta, datetime
import math
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType
import dbldatagen as dg
import urllib
from pyspark.sql.functions import col

interval = timedelta(days=0, hours=0, seconds=1) #, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
#initial
# start = datetime(2022, 12, 5, 6, 0, 0)
# end = datetime(2023, 1, 5, 12, 0, 0)

start = datetime(2023, 1, 17, 6, 0, 0)
end = datetime(2023, 1, 31, 12, 0, 0)
countiesDF = spark.read.table("default.judete")
countiesDF.show()

counties = list(countiesDF.select("county").toPandas()["county"])
counties_abv = list(countiesDF.select("county_abv").toPandas()["county_abv"])
cities = list(countiesDF.select("city").toPandas()["city"])
streets = list(countiesDF.select("street").toPandas()["street"])

car_types = ["hatchback", "sedan", "suv"]
car_brands = ["dacia", "bmw", "mercedes", "volkswagen"]
car_colors = ["red", "green", "blue", "white", "black"]

coordDf = countiesDF.select("city","lon","lat")
display(coordDf)
coordDf.write.mode("overwrite").saveAsTable("default.city_coords")

# display(counties)
# display(counties_abv)

schema = StructType([
    StructField("time", TimestampType(), True),
    StructField("reg_no", StringType(), True),
    StructField("reg_county", StringType(), True),
    StructField("car_type", StringType(), True),
    StructField("car_color", StringType(), True),
    StructField("car_brand", StringType(), True),
    StructField("county", StringType(), True),
    StructField("city", StringType(), True),
    StructField("street", StringType(), True)
])

# # will have implied column `id` for ordinal of row
ds = (dg.DataGenerator(spark, name="car_traffic_data", rows=20000, partitions=2)
      .withSchema(schema)
      # withColumnSpec adds specification for existing column
      .withColumnSpec("time", "timestamp", begin=start, end=end, interval=interval)
      .withColumnSpec("reg_no", template=r'DD-AAA')
      .withColumnSpec("reg_county", values=counties_abv, random=True)
      .withColumnSpec("car_type", values=car_types, random=True)
      .withColumnSpec("car_color", values=car_colors, random=True)
      .withColumnSpec("car_brand", values=car_brands, random=True)
      .withColumnSpec("county", values=counties, random=True)
      .withColumnSpec("city", values=cities, random=True)
      .withColumnSpec("street", values=streets, random=True))

df = ds.build(withStreaming=True, options={'rowsPerSecond': 10})

(df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/dbldatagen/carDatatry/checkpoint_car")
    .start("/tmp/dbldatagen/carDatatry/data"))

+----------+---------------+-----------+---------------+-----+-----+
|county_abv|         county|       city|         street|  lon|  lat|
+----------+---------------+-----------+---------------+-----+-----+
|        AB|           Alba| Alba-Iulia|    1 Decembrie|23.57|46.07|
|        AG|          Argeș|    Pitesti|    Aviatorilor|24.88|44.86|
|        AR|           Arad|       Arad|      Mehedinti|21.31|46.19|
|         B|      București|  Bucuresti|     Revolutiei| 26.1|44.44|
|        BC|          Bacău|      Bacau| Mihai Eminescu|26.91|46.57|
|        BH|          Bihor|     Oradea|      Lalelelor|21.94|47.05|
|        BN|Bistrița Năsăud|   Bistrita|          Dacia|24.49|47.13|
|        BR|         Brăila|     Braila|  Trandafirilor|27.97|45.27|
|        BT|       Botoșani|   Botosani|   Masinistilor|26.11|47.88|
|        BV|         Brașov|     Brasov|     Republicii| 25.6|45.65|
|        BZ|          Buzău|      Buzau|    Ion Creanga|26.82|45.15|
|        CJ|           Cluj|Cluj-N

city,lon,lat
Alba-Iulia,23.57,46.07
Pitesti,24.88,44.86
Arad,21.31,46.19
Bucuresti,26.1,44.44
Bacau,26.91,46.57
Oradea,21.94,47.05
Bistrita,24.49,47.13
Braila,27.97,45.27
Botosani,26.11,47.88
Brasov,25.6,45.65


Output can only be rendered in Databricks

Out[7]: <pyspark.sql.streaming.StreamingQuery at 0x7f57e066dee0>