In [0]:
%pip install dbldatagen

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting dbldatagen
  Downloading dbldatagen-0.3.5-py3-none-any.whl (86 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 86.3/86.3 kB 2.0 MB/s eta 0:00:00
Installing collected packages: dbldatagen
Successfully installed dbldatagen-0.3.5
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, IntegerType, StringType
import dbldatagen as dg
from pyspark.sql.types import LongType, FloatType, IntegerType, StringType, \
                              DoubleType, BooleanType, ShortType, \
                              TimestampType, DateType, DecimalType, \
                              ByteType, BinaryType, ArrayType, MapType, \
                              StructType, StructField

spark = SparkSession.builder.appName("AzureDatabricksToBlob").getOrCreate()

In [0]:
# Define your Azure Blob Storage configurations
account_name = ""
account_key = ""
container_name = ""
blob_folder = "dbGenOutput"  # Optional, you can save directly to the root of the container
blob_name = "jsonData1.json"  # Name of the file to be saved

In [0]:
# Set configurations for Blob Storage
spark.conf.set(f"spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.conf.set(f"spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net", account_key)
output_path = f"wasbs://{container_name}@{account_name}.blob.core.windows.net/{blob_folder}/{blob_name}"

In [0]:
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
                 'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58, 8,
                   17]

generate_number_of_json_rows = 100000000
device_population = 100000

manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']

lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']


In [0]:
testDataSpec = (
    dg.DataGenerator(spark, name="device_data_set", rows=generate_number_of_json_rows,
                     partitions=8, randomSeedMethod='hash_fieldname')
    .withIdOutput()
    # we'll use hash of the base field to generate the ids to
    # avoid a simple incrementing sequence
    .withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
                uniqueValues=device_population, omit=True, baseColumnType="hash")

    # note for format strings, we must use "%lx" not "%x" as the
    # underlying value is a long
    .withColumn("device_id", StringType(), format="0x%013x",
                baseColumn="internal_device_id")

    # the device / user attributes will be the same for the same device id
    # so lets use the internal device id as the base column for these attribute
    .withColumn("country", StringType(), values=country_codes,
                weights=country_weights,
                baseColumn="internal_device_id")

    .withColumn("manufacturer", StringType(), values=manufacturers,
                baseColumn="internal_device_id", omit=True)
    .withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
                baseColumnType="hash", omit=True)
    .withColumn("manufacturer_info", StructType([StructField('line',StringType()),
                                                StructField('manufacturer', StringType())]),
                expr="named_struct('line', line, 'manufacturer', manufacturer)",
                baseColumn=['manufacturer', 'line'])


    .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
                baseColumn="device_id",
                baseColumnType="hash", omit=True)

    .withColumn("event_type", StringType(),
                values=["activation", "deactivation", "plan change",
                        "telecoms activity", "internet activity", "device error"],
                random=True, omit=True)
    .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00",
                end="2020-12-31 23:59:00",
                interval="1 minute", random=True, omit=True)

    .withColumn("event_info",
                 StructType([StructField('event_type',StringType()),
                             StructField('event_ts', TimestampType())]),
                expr="named_struct('event_type', event_type, 'event_ts', event_ts)",
                baseColumn=['event_type', 'event_ts'])
    )

dfTestData = testDataSpec.build()


In [0]:
#This will write several json files to the output directory. Scales well and is recommended
dfTestData.write.format("json").mode("overwrite").save(output_path)

##however if you want a single json file, use this.
#dfTestData.coalesce(1).write.format("json").mode("overwrite").save(output_path)
