In [4]:
%pip install git+https://github.com/databrickslabs/dbldatagen

Collecting git+https://github.com/databrickslabs/dbldatagen
  Cloning https://github.com/databrickslabs/dbldatagen to /tmp/pip-req-build-qm5f8emk
  Running command git clone --filter=blob:none --quiet https://github.com/databrickslabs/dbldatagen /tmp/pip-req-build-qm5f8emk
  Resolved https://github.com/databrickslabs/dbldatagen to commit 14a1bece08f8cebfe9ad050301f32f8afa92f875
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: dbldatagen
  Building wheel for dbldatagen (setup.py) ... [?25ldone
[?25h  Created wheel for dbldatagen: filename=dbldatagen-0.2.0rc1-py3-none-any.whl size=68973 sha256=784721f0f7a297aec904e31b0610455d604ecc5ae198a029d0adf20783703595
  Stored in directory: /tmp/pip-ephem-wheel-cache-u5_58xs2/wheels/fa/2f/29/fd337c2f0a1da95c1069d7f4898c3ff5a0ea697cb5d31f319c
Successfully built dbldatagen
Installing collected packages: dbldatagen
Successfully installed dbldatagen-0.2.0rc1
Note: you may need to restart the kernel to use u

In [15]:
from pyspark.sql.types import LongType, IntegerType, StringType

import dbldatagen as dg

shuffle_partitions_requested = 12 * 4
partitions_requested = 96
device_population = 100000
data_rows = 1000 * 100000

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG', 'ES',
                 'GE', 'MX', 'ET', 'SA',
                 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58, 8,
                   17]

manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']

lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']

testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
                                 partitions=partitions_requested, randomSeedMethod='hash_fieldname')
                .withIdOutput()
                # we'll use hash of the base field to generate the ids to 
                # avoid a simple incrementing sequence
                .withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
                            uniqueValues=device_population, omit=True, baseColumnType="hash")

                # note for format strings, we must use "%lx" not "%x" as the 
                # underlying value is a long
                .withColumn("device_id", StringType(), format="0x%013x",
                            baseColumn="internal_device_id")

                # the device / user attributes will be the same for the same device id 
                # so lets use the internal device id as the base column for these attribute
                .withColumn("country", StringType(), values=country_codes,
                            weights=country_weights,
                            baseColumn="internal_device_id")
                .withColumn("manufacturer", StringType(), values=manufacturers,
                            baseColumn="internal_device_id")

                # use omit = True if you don't want a column to appear in the final output 
                # but just want to use it as part of generation of another column
                .withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
                            baseColumnType="hash", omit=True)
                .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
                            baseColumn="device_id",
                            baseColumnType="hash", omit=True)

                .withColumn("model_line", StringType(), expr="concat(line)",
                            baseColumn=["line", "model_ser"])
                .withColumn("event_type", StringType(),
                            values=["activation", "deactivation", "plan change",
                                    "telecoms activity", "internet activity", "device error"],
                            random=True)
                .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)

                )

dfTestData = testDataSpec.build()

# dfTestData.write.format("csv").mode("overwrite").save(
#     "gs://spark-poc-ca/dbldatagen_examples/a_billion_row_table")

In [2]:
!pip install git+https://github.com/databrickslabs/dbldatagen

Collecting git+https://github.com/databrickslabs/dbldatagen
  Cloning https://github.com/databrickslabs/dbldatagen to /tmp/pip-req-build-ko4bwkyi
  Running command git clone --filter=blob:none --quiet https://github.com/databrickslabs/dbldatagen /tmp/pip-req-build-ko4bwkyi
  Resolved https://github.com/databrickslabs/dbldatagen to commit 14a1bece08f8cebfe9ad050301f32f8afa92f875
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: dbldatagen
  Building wheel for dbldatagen (setup.py) ... [?25ldone
[?25h  Created wheel for dbldatagen: filename=dbldatagen-0.2.0rc1-py3-none-any.whl size=68973 sha256=e884a1d117be0e126707688e7231c12c5e1e4007a1d85dc91d96ecb79e3a9ed1
  Stored in directory: /tmp/pip-ephem-wheel-cache-mla35a7v/wheels/fa/2f/29/fd337c2f0a1da95c1069d7f4898c3ff5a0ea697cb5d31f319c
Successfully built dbldatagen
Installing collected packages: dbldatagen
Successfully installed dbldatagen-0.2.0rc1


In [9]:
dfTestData.show()

+---+---------------+-------+---------------+----------+-----------------+-------------------+
| id|      device_id|country|   manufacturer|model_line|       event_type|           event_ts|
+---+---------------+-------+---------------+----------+-----------------+-------------------+
|  0|0x100000001281d|     CN|     Xyzzy Inc.| lakehouse|     deactivation|2020-12-30 21:02:00|
|  1|0x1000000013b1d|     US|     Delta corp|    gadget|internet activity|2020-07-22 16:02:00|
|  2|0x1000000011c18|     CN|Embanks Devices| lakehouse|     deactivation|2020-07-07 10:33:00|
|  3|0x10000000050e3|     IN|      Acme Corp|     droid|internet activity|2020-12-07 20:24:00|
|  4|0x1000000003674|     CN|     Xyzzy Inc.| lakehouse|       activation|2020-05-28 14:10:00|
|  5|0x100000001492c|     CN|Embanks Devices| lakehouse|internet activity|2020-05-31 02:17:00|
|  6|0x100000000b185|     CN|     Xyzzy Inc.| lakehouse|telecoms activity|2020-02-07 21:24:00|
|  7|0x10000000041ab|     IN|  Lakehouse Ltd|     

In [5]:
dfTestData.write.format("csv").mode("overwrite").save(
    "gs://spark-poc-ca/dbldatagen_examples/a_billion_row_table")

                                                                                

In [16]:
dfTestData.coalesce(1).write.csv("gs://spark-poc-ca/test/a_billion_row_table3")

                                                                                

In [13]:
dfTestData.repartition(1).write.csv("gs://spark-poc-ca/test/a_billion_row_table4")

                                                                                