### Datagen example

From: https://databrickslabs.github.io/dbldatagen/public_docs/multi_table_data.html

In [18]:
import re

MARGIN_PATTERN= re.compile(r"\s*\|")  # margin detection pattern for stripMargin
def stripMargin(s): 
  """  strip margin removes leading space in multi line string before '|' """
  return "\n".join(re.split(MARGIN_PATTERN, s))


In [2]:
import dbldatagen as dg

# clear cache so that if we run multiple times to check performance, we're not relying on cache
spark.catalog.clearCache()

UNIQUE_PLANS = 20
PLAN_MIN_VALUE = 100

shuffle_partitions_requested = 8
partitions_requested = 1
data_rows = UNIQUE_PLANS # we'll generate one row for each plan

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)


In [3]:
plan_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
            .withColumn("plan_id","int", minValue=PLAN_MIN_VALUE, uniqueValues=UNIQUE_PLANS)
            .withColumn("plan_name", prefix="plan", baseColumn="plan_id")  # use plan_id as root value
                
            # note default step is 1 so you must specify a step for small number ranges, 
            .withColumn("cost_per_mb", "decimal(5,3)", minValue=0.005, maxValue=0.050, step=0.005, random=True)  
            .withColumn("cost_per_message", "decimal(5,3)", minValue=0.001, maxValue=0.02, step=0.001, random=True)  
            .withColumn("cost_per_minute", "decimal(5,3)", minValue=0.001, maxValue=0.01, step=0.001, random=True)  
                
            # we're modelling long distance and international prices simplistically - each is a multiplier thats applied to base rate
            .withColumn("ld_multiplier", "decimal(5,3)", minValue=1.5, maxValue=3, step=0.05, random=True, 
                        distribution="normal", omit=True)  
            .withColumn("ld_cost_per_minute", "decimal(5,3)", expr="cost_per_minute * ld_multiplier", 
                        baseColumns=['cost_per_minute', 'ld_multiplier'])  
            .withColumn("intl_multiplier", "decimal(5,3)", minValue=2, maxValue=4, step=0.05, random=True, 
                        distribution="normal", omit=True)  
            .withColumn("intl_cost_per_minute", "decimal(5,3)", expr="cost_per_minute * intl_multiplier", 
                        baseColumns=['cost_per_minute', 'intl_multiplier'])  
                
            )
df_plans = (plan_dataspec.build()
            .cache()
           )

display(df_plans)

DataFrame[plan_id: int, plan_name: string, cost_per_mb: decimal(5,3), cost_per_message: decimal(5,3), cost_per_minute: decimal(5,3), ld_cost_per_minute: decimal(5,3), intl_cost_per_minute: decimal(5,3)]

In [4]:
df_plans.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------+---------+-----------+----------------+---------------+------------------+--------------------+
|plan_id|plan_name|cost_per_mb|cost_per_message|cost_per_minute|ld_cost_per_minute|intl_cost_per_minute|
+-------+---------+-----------+----------------+---------------+------------------+--------------------+
|    100| plan_100|      0.025|           0.012|          0.010|             0.030|               0.031|
|    101| plan_101|      0.045|           0.018|          0.005|             0.013|               0.015|
|    102| plan_102|      0.005|           0.007|          0.008|             0.014|               0.032|
|    103| plan_103|      0.015|           0.006|          0.006|             0.009|               0.012|
|    104| plan_104|      0.010|           0.011|          0.001|             0.003|               0.002|
|    105| plan_105|      0.030|           0.014|          0.007|             0.014|               0.020|
|    106| plan_106|      0.020|           0.020|       

                                                                                

In [5]:
import dbldatagen as dg
import pyspark.sql.functions as F

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)

UNIQUE_CUSTOMERS = 50000
CUSTOMER_MIN_VALUE = 1000
DEVICE_MIN_VALUE = 1000000000
SUBSCRIBER_NUM_MIN_VALUE = 1000000000

spark.catalog.clearCache()  # clear cache so that if we run multiple times to check performance, we're not relying on cache
shuffle_partitions_requested = 8
partitions_requested = 8
data_rows = UNIQUE_CUSTOMERS

customer_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
            .withColumn("customer_id","decimal(10)", minValue=CUSTOMER_MIN_VALUE, uniqueValues=UNIQUE_CUSTOMERS)
            .withColumn("customer_name", template=r"\\w \\w|\\w a. \\w")  
           
            # use the following for a simple sequence
            #.withColumn("device_id","decimal(10)", minValue=DEVICE_MIN_VALUE, uniqueValues=UNIQUE_CUSTOMERS)
                     
            .withColumn("device_id","decimal(10)",  minValue=DEVICE_MIN_VALUE, 
                        baseColumn="customer_id", baseColumnType="hash")

            .withColumn("phone_number","decimal(10)",  minValue=SUBSCRIBER_NUM_MIN_VALUE, 
                        baseColumn=["customer_id", "customer_name"], baseColumnType="hash")

            # for email, we'll just use the formatted phone number
            .withColumn("email","string",  format="subscriber_%s@myoperator.com", baseColumn="phone_number")
            .withColumn("plan", "int", minValue=PLAN_MIN_VALUE, uniqueValues=UNIQUE_PLANS, random=True)
            )

df_customers = (customer_dataspec.build()
                .dropDuplicates(["device_id"])
                .dropDuplicates(["phone_number"])
                .orderBy("customer_id")
                .cache()
               )

effective_customers = df_customers.count()

print(stripMargin(f"""revised customers : {df_customers.count()}, 
       |   unique customers: {df_customers.select(F.countDistinct('customer_id')).take(1)[0][0]}, 
       |   unique device ids: {df_customers.select(F.countDistinct('device_id')).take(1)[0][0]},
       |   unique phone numbers: {df_customers.select(F.countDistinct('phone_number')).take(1)[0][0]}""")
     )

display(df_customers)

                                                                                

revised customers : 50000,
   unique customers: 50000,
   unique device ids: 50000,
   unique phone numbers: 50000


DataFrame[customer_id: decimal(10,0), customer_name: string, device_id: decimal(10,0), phone_number: decimal(10,0), email: string, plan: int]

In [6]:
df_customers.show()

+-----------+--------------------+----------+------------+--------------------+----+
|customer_id|       customer_name| device_id|phone_number|               email|plan|
+-----------+--------------------+----------+------------+--------------------+----+
|       1000|  commodo m. nostrud|9196676281|  1046828624|subscriber_104682...| 118|
|       1001|               do ex|7945719524|  3100324535|subscriber_310032...| 108|
|       1002|        anim o. anim|1018593222|  8004658441|subscriber_800465...| 106|
|       1003|        dolor k. est|9858441589|  8174126224|subscriber_817412...| 115|
|       1004|   proident deserunt|1907930576|  2004998683|subscriber_200499...| 110|
|       1005|    pariatur r. aute|8365271101|  8633827457|subscriber_863382...| 117|
|       1006|adipiscing o. cup...|2906230381|  7923251099|subscriber_792325...| 118|
|       1007|          sunt t. ea|2027317295|  8445814606|subscriber_844581...| 103|
|       1008|    adipiscing dolor|9328194574|  2714866555|subscri

In [7]:
import dbldatagen as dg

AVG_EVENTS_PER_CUSTOMER = 50

spark.catalog.clearCache()  # clear cache so that if we run multiple times to check performance, we're not relying on cache
shuffle_partitions_requested = 8
partitions_requested = 8
NUM_DAYS=31
MB_100 = 100 * 1000 * 1000
K_1 = 1000
data_rows = AVG_EVENTS_PER_CUSTOMER * UNIQUE_CUSTOMERS * NUM_DAYS

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)


# use random seed method of 'hash_fieldname' for better spread - default in later builds
events_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested, randomSeed=42,
                                    randomSeedMethod="hash_fieldname")
             # use same logic as per customers dataset to ensure matching keys - but make them random
            .withColumn("device_id_base", "decimal(10)", minValue=CUSTOMER_MIN_VALUE, uniqueValues=UNIQUE_CUSTOMERS,
                        random=True, omit=True)
            .withColumn("device_id", "decimal(10)",  minValue=DEVICE_MIN_VALUE,
                        baseColumn="device_id_base", baseColumnType="hash")

            # use specific random seed to get better spread of values
            .withColumn("event_type", "string",  values=["sms", "internet", "local call", "ld call", "intl call"],
                                                weights=[50, 50, 20, 10, 5], random=True)

            # use Gamma distribution for skew towards short calls
            .withColumn("base_minutes","decimal(7,2)",  minValue=1.0, maxValue=100.0, step=0.1,
                        distribution=dg.distributions.Gamma(shape=1.5, scale=2.0), random=True, omit=True)
                   
            # use Gamma distribution for skew towards short transfers
            .withColumn("base_bytes_transferred","decimal(12)",  minValue=K_1, maxValue=MB_100, 
                        distribution=dg.distributions.Gamma(shape=0.75, scale=2.0), random=True, omit=True)
                   
            .withColumn("minutes", "decimal(7,2)", baseColumn=["event_type", "base_minutes"],
                        expr="""
                              case when event_type in ("local call", "ld call", "intl call") then base_minutes
                              else 0
                              end
                               """)
            .withColumn("bytes_transferred", "decimal(12)", baseColumn=["event_type", "base_bytes_transferred"],
                        expr="""
                              case when event_type = "internet" then base_bytes_transferred
                              else 0
                              end
                               """)
                   
            .withColumn("event_ts", "timestamp", data_range=dg.DateRange("2020-07-01 00:00:00",
                                                                             "2020-07-31 11:59:59",
                                                                             "seconds=1"),
                        random=True)
                   
            )

df_events = (events_dataspec.build()
               )

display(df_events)

DataFrame[device_id: decimal(10,0), event_type: string, minutes: decimal(7,2), bytes_transferred: decimal(12,0), event_ts: timestamp]

In [8]:
df_events.show()

+----------+----------+-------+-----------------+-------------------+
| device_id|event_type|minutes|bytes_transferred|           event_ts|
+----------+----------+-------+-----------------+-------------------+
|8783384069|local call|   5.90|                0|2020-07-26 16:17:55|
|8766749269|       sms|   0.00|                0|2020-07-23 14:06:49|
|8944400746|       sms|   0.00|                0|2020-07-18 04:56:45|
|8471543556|       sms|   0.00|                0|2020-07-04 02:02:40|
|1954922029|       sms|   0.00|                0|2020-07-16 12:06:18|
|8158729839|  internet|   0.00|           520279|2020-07-02 23:47:56|
|9534966611|   ld call|   1.10|                0|2020-07-20 12:20:28|
|2395997193|local call|  13.90|                0|2020-07-28 08:30:32|
|9043775345|       sms|   0.00|                0|2020-07-30 06:33:32|
|9445756890|       sms|   0.00|                0|2020-07-27 17:38:34|
|9852527388|  internet|   0.00|           897661|2020-07-16 05:37:01|
|9853758216|       s

In [9]:
df_customer_pricing = df_customers.join(df_plans, df_plans.plan_id == df_customers.plan)

display(df_customer_pricing)

DataFrame[customer_id: decimal(10,0), customer_name: string, device_id: decimal(10,0), phone_number: decimal(10,0), email: string, plan: int, plan_id: int, plan_name: string, cost_per_mb: decimal(5,3), cost_per_message: decimal(5,3), cost_per_minute: decimal(5,3), ld_cost_per_minute: decimal(5,3), intl_cost_per_minute: decimal(5,3)]

In [10]:
df_customer_pricing.show()

+-----------+--------------------+----------+------------+--------------------+----+-------+---------+-----------+----------------+---------------+------------------+--------------------+
|customer_id|       customer_name| device_id|phone_number|               email|plan|plan_id|plan_name|cost_per_mb|cost_per_message|cost_per_minute|ld_cost_per_minute|intl_cost_per_minute|
+-----------+--------------------+----------+------------+--------------------+----+-------+---------+-----------+----------------+---------------+------------------+--------------------+
|      50523|          nisi c. id|9474651738|  1000111976|subscriber_100011...| 107|    107| plan_107|      0.025|           0.017|          0.006|             0.010|               0.020|
|       3916|       dolor commodo|2599584759|  1000238254|subscriber_100023...| 107|    107| plan_107|      0.025|           0.017|          0.006|             0.010|               0.020|
|      41040|             non est|1344902705|  1000703061|su

In [11]:
import pyspark.sql.functions as F


# lets compute the summary minutes messages and bytes transferred
df_enriched_events = (df_events
                      .withColumn("message_count", F.expr("case when event_type='sms' then 1 else 0 end"))
                      .withColumn("ld_minutes", F.expr("case when event_type='ld call' then cast(ceil(minutes) as decimal(18,3)) else 0.0 end"))
                      .withColumn("local_minutes", F.expr("case when event_type='local call' then cast(ceil(minutes) as decimal(18,3)) else 0.0 end"))
                      .withColumn("intl_minutes", F.expr("case when event_type='intl call' then cast(ceil(minutes) as decimal(18,3)) else 0.0 end"))
                     )

df_enriched_events.createOrReplaceTempView("telephony_events")

df_summary = spark.sql("""select device_id, 
                                 round(sum(bytes_transferred) / 1000000.0, 3) as total_mb, 
                                 sum(message_count) as total_messages,
                                 sum(ld_minutes) as total_ld_minutes,
                                 sum(local_minutes) as total_local_minutes,
                                 sum(intl_minutes) as total_intl_minutes, 
                                 count(device_id) as event_count
                                 from telephony_events
                                 group by device_id
                          
""")

df_summary.createOrReplaceTempView("event_summary")

display(df_summary.where("event_count > 0"))

DataFrame[device_id: decimal(10,0), total_mb: decimal(27,3), total_messages: bigint, total_ld_minutes: decimal(28,3), total_local_minutes: decimal(28,3), total_intl_minutes: decimal(28,3), event_count: bigint]

In [12]:
df_summary.show()



+----------+--------+--------------+----------------+-------------------+------------------+-----------+
| device_id|total_mb|total_messages|total_ld_minutes|total_local_minutes|total_intl_minutes|event_count|
+----------+--------+--------------+----------------+-------------------+------------------+-----------+
|1954922029|4869.900|           629|        1473.000|           2345.000|           722.000|       1656|
|9852527388|4634.819|           568|        1194.000|           2528.000|           727.000|       1561|
|8578285899|4433.690|           591|        1403.000|           2257.000|           510.000|       1541|
|9257138445|4692.269|           529|        1329.000|           2914.000|           672.000|       1533|
|2749260882|4317.791|           560|        1463.000|           2513.000|           591.000|       1546|
|9128361440|4162.059|           538|        1360.000|           2690.000|           623.000|       1478|
|9087208703|4187.461|           569|        1342.000|  

                                                                                

In [13]:
df_customer_summary = (df_customer_pricing.join(df_summary,df_customer_pricing.device_id == df_summary.device_id )
                       .createOrReplaceTempView("customer_summary"))

df_invoices = spark.sql("""select *, 
                           internet_cost + sms_cost + ld_cost + local_cost + intl_cost as total_invoice 
                           from 
                             (select customer_id, customer_name, phone_number, email, plan_name, 
                             cast(round(total_mb * cost_per_mb, 2) as decimal(18,3)) as internet_cost,
                             cast(round(total_ld_minutes * ld_cost_per_minute, 2) as decimal(18,2)) as ld_cost,
                             cast(round(total_local_minutes * cost_per_minute, 2) as decimal(18,2)) as local_cost,
                             cast(round(total_intl_minutes * intl_cost_per_minute, 2) as decimal(18,2)) as intl_cost,
                             cast(round(total_messages * cost_per_message, 2) as decimal(18,2)) as sms_cost
                             from customer_summary)

""")

display(df_invoices)

DataFrame[customer_id: decimal(10,0), customer_name: string, phone_number: decimal(10,0), email: string, plan_name: string, internet_cost: decimal(18,3), ld_cost: decimal(18,2), local_cost: decimal(18,2), intl_cost: decimal(18,2), sms_cost: decimal(18,2), total_invoice: decimal(23,3)]

In [14]:
df_invoices.show()



+-----------+--------------------+------------+--------------------+---------+-------------+-------+----------+---------+--------+-------------+
|customer_id|       customer_name|phone_number|               email|plan_name|internet_cost|ld_cost|local_cost|intl_cost|sms_cost|total_invoice|
+-----------+--------------------+------------+--------------------+---------+-------------+-------+----------+---------+--------+-------------+
|      12631| cupidatat h. cillum|  9908074451|subscriber_990807...| plan_108|       48.700|   4.42|      2.35|     2.17|    0.63|       58.270|
|       3688|         qui laborum|  1577114986|subscriber_157711...| plan_101|      208.570|  15.52|     12.64|    10.91|   10.22|      257.860|
|      41111|        ut w. mollit|  1290291097|subscriber_129029...| plan_110|      221.680|  25.25|     22.57|    13.77|   11.82|      295.090|
|      16959|        excepteur ex|  2931211720|subscriber_293121...| plan_101|      211.150|  17.28|     14.57|    10.08|    9.52|

                                                                                

In [15]:
print(df_invoices.count())



50000


                                                                                

### Spider schema
Using huggingface datasets.

In [7]:
import pandas as pd
import re
from datasets import load_dataset_builder
from datasets import load_dataset
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [8]:
# pd.set_option('display.max_rows', 500)
# pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [9]:
dataset_name = "richardr1126/spider-schema"
dataset = load_dataset(dataset_name, split="train")
df_schema = pd.DataFrame(dataset)
df_schema

Unnamed: 0,db_id,Schema (values (type)),Primary Keys,Foreign Keys
0,perpetrator,"perpetrator : Perpetrator_ID (number) , People...",perpetrator : Perpetrator_ID | people : People_ID,perpetrator : People_ID equals people : People_ID
1,college_2,"classroom : building (text) , room_number (tex...",classroom : building | department : dept_name ...,course : dept_name equals department : dept_na...
2,flight_company,"airport : id (number) , City (text) , Country ...",airport : id | operate_company : id | flight : id,flight : company_id equals operate_company : i...
3,icfp_1,"Inst : instID (number) , name (text) , country...",Inst : instID | Authors : authID | Papers : pa...,Authorship : paperID equals Papers : paperID |...
4,body_builder,"body_builder : Body_Builder_ID (number) , Peop...",body_builder : Body_Builder_ID | people : Peop...,body_builder : People_ID equals people : Peopl...
...,...,...,...,...
161,company_1,"works_on : Essn (number) , Pno (number) , Hour...",works_on : Essn | employee : Ssn | department ...,
162,workshop_paper,"workshop : Workshop_ID (number) , Date (text) ...",workshop : Workshop_ID | submission : Submissi...,Acceptance : Workshop_ID equals workshop : Wor...
163,epinions_1,"item : i_id (number) , title (text) | review :...",item : i_id | review : a_id | useracct : u_id,review : i_id equals item : i_id | review : u_...
164,party_host,"party : Party_ID (number) , Party_Theme (text)...",party : Party_ID | host : Host_ID | party_host...,party_host : Party_ID equals party : Party_ID ...


In [10]:
schema = list(df_schema.loc[df_schema['db_id'] == 'yelp']['Schema (values (type))'])[0]
schema

'business : bid (number) , business_id (text) , name (text) , full_address (text) , city (text) , latitude (text) , longitude (text) , review_count (number) , is_open (number) , rating (number) , state (text) | category : id (number) , business_id (text) , category_name (text) | user : uid (number) , user_id (text) , name (text) | checkin : cid (number) , business_id (text) , count (number) , day (text) | neighbourhood : id (number) , business_id (text) , neighbourhood_name (text) | review : rid (number) , business_id (text) , user_id (text) , rating (number) , text (text) , year (number) , month (text) | tip : tip_id (number) , business_id (text) , text (text) , user_id (text) , likes (number) , year (number) , month (text)'

In [12]:
def parse_field(s):
    spark_types = {
        'number': IntegerType(),
        'text': StringType(),
    }
    match = re.match(r"(.*?) \((.*?)\)", s)
    if match:
        name, dtype = match.groups()
        return StructField(name, spark_types[dtype])
    else:
        raise ValueError(f"Unable to parse: {s}")
        
def parse_table(s):
    name, fields = s.split(" : ")
    struct_fields = [parse_field(f) for f in fields.split(" , ")]
    schema = StructType(struct_fields)
    return name, schema

In [16]:
spark_schemas = dict(parse_table(t) for t in schema.split(" | "))
spark_schemas

{'business': StructType([StructField('bid', IntegerType(), True), StructField('business_id', StringType(), True), StructField('name', StringType(), True), StructField('full_address', StringType(), True), StructField('city', StringType(), True), StructField('latitude', StringType(), True), StructField('longitude', StringType(), True), StructField('review_count', IntegerType(), True), StructField('is_open', IntegerType(), True), StructField('rating', IntegerType(), True), StructField('state', StringType(), True)]),
 'category': StructType([StructField('id', IntegerType(), True), StructField('business_id', StringType(), True), StructField('category_name', StringType(), True)]),
 'user': StructType([StructField('uid', IntegerType(), True), StructField('user_id', StringType(), True), StructField('name', StringType(), True)]),
 'checkin': StructType([StructField('cid', IntegerType(), True), StructField('business_id', StringType(), True), StructField('count', IntegerType(), True), StructField

In [75]:
import dbldatagen as dg
import dbldatagen.distributions as dist

In [14]:
# clear cache so that if we run multiple times to check performance, we're not relying on cache
spark.catalog.clearCache()

shuffle_partitions_requested = 8
partitions_requested = 1
data_rows = 100

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)

In [None]:
schema = spark_schemas['business']
schema

StructType([StructField('bid', IntegerType(), True), StructField('business_id', StringType(), True), StructField('name', StringType(), True), StructField('full_address', StringType(), True), StructField('city', StringType(), True), StructField('latitude', StringType(), True), StructField('longitude', StringType(), True), StructField('review_count', IntegerType(), True), StructField('is_open', IntegerType(), True), StructField('rating', IntegerType(), True), StructField('state', StringType(), True)])

In [None]:
dataspec = dg.DataGenerator(spark, rows=100, partitions=2).withSchema(schema)

In [None]:
df = dataspec.build()
df.show()

+---+-----------+----+------------+----+--------+---------+------------+-------+------+-----+
|bid|business_id|name|full_address|city|latitude|longitude|review_count|is_open|rating|state|
+---+-----------+----+------------+----+--------+---------+------------+-------+------+-----+
|  0|          0|   0|           0|   0|       0|        0|           0|      0|     0|    0|
|  1|          1|   1|           1|   1|       1|        1|           1|      1|     1|    1|
|  2|          2|   2|           2|   2|       2|        2|           2|      2|     2|    2|
|  3|          3|   3|           3|   3|       3|        3|           3|      3|     3|    3|
|  4|          4|   4|           4|   4|       4|        4|           4|      4|     4|    4|
|  5|          5|   5|           5|   5|       5|        5|           5|      5|     5|    5|
|  6|          6|   6|           6|   6|       6|        6|           6|      6|     6|    6|
|  7|          7|   7|           7|   7|       7|        7| 

In [None]:
dataspec2 = (
    dg.DataGenerator(spark, rows=100, partitions=2).withSchema(schema)
    .withColumnSpec("business_id", minValue=100, maxValue=10000, random=True)
    .withColumnSpec("name", text=dg.ILText(words=(2,3)))
    .withColumnSpec("full_address", template=r'\n \w \w')
    .withColumnSpec("city", text=dg.ILText(words=(1,3)))
    .withColumnSpec("latitude", template=r'\n.\n\n')
    .withColumnSpec("longitude", template=r'\n.\n\n')
    .withColumnSpec("review_count", minValue=1, maxValue=1000, random=True, distribution=dist.Gamma(1.0, 2.0))
    .withColumnSpec("is_open", minValue=0, maxValue=1, random=True)
    .withColumnSpec("rating", minValue=1, maxValue=5, random=True)
    .withColumnSpec("state", values=["CA", "OR", "WA", "NV", "AZ"], random=True)
)

In [None]:
df = dataspec2.build()
df.show()

+---+-----------+--------------------+--------------------+--------------------+----------+----------+------------+-------+------+-----+
|bid|business_id|                name|        full_address|                city|  latitude| longitude|review_count|is_open|rating|state|
+---+-----------+--------------------+--------------------+--------------------+----------+----------+------------+-------+------+-----+
|  0|       5420|        Ut elit sed.|49 cupidatat repr...|         Mollit sed.| 126.10287| 53.124234|         133|      1|     4|   WA|
|  1|       4168|Adipiscing cillum...|   237 dolor aliquip|         Laboris ut.| 16.171174|138.141184|         144|      0|     3|   NV|
|  2|       6217|    Commodo commodo.|      109 anim velit|   Id proident duis.| 138.13053|118.155148|         142|      1|     2|   AZ|
|  3|       6565|Aliquip dolor nos...|        127 velit in|              Magna.| 151.11278|   15.2094|          81|      1|     2|   CA|
|  4|       9003|      Dolor ullamco.|   

### Importing sqlite DB
Per: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/from_to_dbms.html

In [1]:
import pandas as pd
import pyspark.pandas as ps
import sqlite3

from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType



In [2]:
database = '/home/leey/devpub/spider/spider/database/college_1/college_1.sqlite'

In [3]:
con = sqlite3.connect(database)
cur = con.cursor()

24/04/12 01:14:40 WARN Dispatcher: Message RequestMessage(dgx2h0194.spark.sjc4.nvmetal.net:7077, NettyRpcEndpointRef(spark://AppClient@10.150.30.2:46353), ExecutorAdded(0,worker-20240412011406-10.150.30.2-45313,10.150.30.2:45313,6,16384)) dropped due to sparkEnv is stopped. Could not find AppClient.
24/04/12 01:14:40 WARN Dispatcher: Message RequestMessage(dgx2h0194.spark.sjc4.nvmetal.net:7077, NettyRpcEndpointRef(spark://AppClient@10.150.30.2:46353), ExecutorAdded(1,worker-20240412011406-10.150.30.2-45313,10.150.30.2:45313,6,16384)) dropped due to sparkEnv is stopped. Could not find AppClient.


#### sample query

In [None]:
res = cur.execute("""
SELECT T1.stu_fname 
FROM student AS T1 
JOIN enroll AS T2 ON T1.stu_num  =  T2.stu_num 
JOIN CLASS AS T3 ON T2.class_code  =  T3.class_code 
JOIN course AS T4 ON T3.crs_code  =  T4.crs_code 
JOIN department AS T5 ON T5.dept_code  =  T4.dept_code 
WHERE T5.dept_name  =  'Accounting' 
INTERSECT 
SELECT T1.stu_fname FROM student AS T1 
JOIN enroll AS T2 ON T1.stu_num  =  T2.stu_num 
JOIN CLASS AS T3 ON T2.class_code  =  T3.class_code 
JOIN course AS T4 ON T3.crs_code  =  T4.crs_code 
JOIN department AS T5 ON T5.dept_code  =  T4.dept_code 
WHERE T5.dept_name  =  'Computer Info. Systems'
""")

In [None]:
res.fetchall()

[('Anne',), ('William',)]

#### read table into pyspark

In [None]:
psdf = ps.read_sql("class", con="jdbc:sqlite:/home/leey/devpub/spider/spider/database/college_1/college_1.sqlite")

In [None]:
psdf

24/04/11 21:01:34 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

Unnamed: 0,CLASS_CODE,CRS_CODE,CLASS_SECTION,CLASS_TIME,CLASS_ROOM,PROF_NUM
0,10012,ACCT-211,1,MWF 8:00-8:50 a.m.,BUS311,105
1,10013,ACCT-211,2,MWF 9:00-9:50 a.m.,BUS200,105
2,10014,ACCT-211,3,TTh 2:30-3:45 p.m.,BUS252,342
3,10015,ACCT-212,1,MWF 10:00-10:50 a.m.,BUS311,301
4,10016,ACCT-212,2,Th 6:00-8:40 p.m.,BUS252,301
5,10017,CIS-220,1,MWF 9:00-9:50 a.m.,KLR209,228
6,10018,CIS-220,2,MWF 9:00-9:50 a.m.,KLR211,114
7,10019,CIS-220,3,MWF 10:00-10:50 a.m.,KLR209,228
8,10020,CIS-420,1,W 6:00-8:40 p.m.,KLR209,162
9,10021,QM-261,1,MWF 8:00-8:50 a.m.,KLR200,114


In [None]:
df = psdf.spark.frame()

In [None]:
df.show()

                                                                                

+----------+--------+-------------+--------------------+----------+--------+
|CLASS_CODE|CRS_CODE|CLASS_SECTION|          CLASS_TIME|CLASS_ROOM|PROF_NUM|
+----------+--------+-------------+--------------------+----------+--------+
|     10012|ACCT-211|            1|  MWF 8:00-8:50 a.m.|    BUS311|     105|
|     10013|ACCT-211|            2|  MWF 9:00-9:50 a.m.|    BUS200|     105|
|     10014|ACCT-211|            3|  TTh 2:30-3:45 p.m.|    BUS252|     342|
|     10015|ACCT-212|            1|MWF 10:00-10:50 a.m.|    BUS311|     301|
|     10016|ACCT-212|            2|   Th 6:00-8:40 p.m.|    BUS252|     301|
|     10017| CIS-220|            1|  MWF 9:00-9:50 a.m.|    KLR209|     228|
|     10018| CIS-220|            2|  MWF 9:00-9:50 a.m.|    KLR211|     114|
|     10019| CIS-220|            3|MWF 10:00-10:50 a.m.|    KLR209|     228|
|     10020| CIS-420|            1|    W 6:00-8:40 p.m.|    KLR209|     162|
|     10021|  QM-261|            1|  MWF 8:00-8:50 a.m.|    KLR200|     114|

In [None]:
df.schema

StructType([StructField('CLASS_CODE', StringType(), True), StructField('CRS_CODE', StringType(), True), StructField('CLASS_SECTION', StringType(), True), StructField('CLASS_TIME', StringType(), True), StructField('CLASS_ROOM', StringType(), True), StructField('PROF_NUM', LongType(), True)])

### dbldatagen

In [None]:
import dbldatagen as dg

In [None]:
analyzer = dg.DataAnalyzer(sparkSession=spark, df=df)
generatedCode = analyzer.scriptDataGeneratorFromData()

24/04/11 21:03:44 WARN CacheManager: Asked to cache already cached data.



# Code snippet generated with Databricks Labs Data Generator (`dbldatagen`) DataAnalyzer class
# Install with `pip install dbldatagen` or in notebook with `%pip install dbldatagen`
# See the following resources for more details:
#
#   Getting Started - [https://databrickslabs.github.io/dbldatagen/public_docs/APIDOCS.html]
#   Github project - [https://github.com/databrickslabs/dbldatagen]
#
import dbldatagen as dg
import pyspark.sql.types

# Column definitions are stubs only - modify to generate correct data  
#
generation_spec = (
    dg.DataGenerator(sparkSession=spark, 
                     name='synthetic_data', 
                     rows=100000,
                     random=True,
                     )
    .withColumn('CLASS_CODE', 'string', template=r'\\w')
    .withColumn('CRS_CODE', 'string', template=r'\\w')
    .withColumn('CLASS_SECTION', 'string', template=r'\\w')
    .withColumn('CLASS_TIME', 'string', template=r'\\w')
    .withColumn('CLASS_ROOM', 'string', template=r'\\w

In [None]:
import dbldatagen as dg
import pyspark.sql.types

# Column definitions are stubs only - modify to generate correct data  
#
generation_spec = (
    dg.DataGenerator(sparkSession=spark, 
                     name='synthetic_data', 
                     rows=100000,
                     random=True,
                     )
    .withColumn('CLASS_CODE', 'string', template=r'\\w')
    .withColumn('CRS_CODE', 'string', template=r'\\w')
    .withColumn('CLASS_SECTION', 'string', template=r'\\w')
    .withColumn('CLASS_TIME', 'string', template=r'\\w')
    .withColumn('CLASS_ROOM', 'string', template=r'\\w')
    .withColumn('PROF_NUM', 'bigint', minValue=105, maxValue=342)
    )

In [None]:
df_synthetic = generation_spec.build()
df_synthetic.show()

[Stage 39:>                                                         (0 + 1) / 1]

+----------+-------------+-------------+------------+------------+--------+
|CLASS_CODE|     CRS_CODE|CLASS_SECTION|  CLASS_TIME|  CLASS_ROOM|PROF_NUM|
+----------+-------------+-------------+------------+------------+--------+
|        ad|       tempor|     pariatur|     eiusmod|     nostrud|     297|
|       non|    cupidatat|  consectetur|     commodo|        enim|     227|
|   laboris|           in|  consectetur|          ut|     commodo|     186|
|      amet|      officia|   incididunt|       nulla|         est|     307|
|   commodo|     proident|   incididunt|    occaecat|       ipsum|     327|
|    labore|       dolore|         duis|      fugiat|      mollit|     296|
|   eiusmod|           id|  consectetur|exercitation|          in|     302|
|   officia|        dolor|    consequat|      tempor|        anim|     177|
|     magna|       cillum|          sed|          ex|          ad|     137|
|     culpa|        nulla|         elit|     laboris|        anim|     267|
|   laborum|

                                                                                

In [None]:
df_synthetic.count()

100000

In [None]:
df_joined = df.union(df_synthetic)

In [None]:
df_joined.createOrReplaceTempView("class")

In [None]:
res = spark.sql("SELECT * from class where CRS_CODE='ACCT-211'")

In [None]:
res.show()

                                                                                

+----------+--------+-------------+------------------+----------+--------+
|CLASS_CODE|CRS_CODE|CLASS_SECTION|        CLASS_TIME|CLASS_ROOM|PROF_NUM|
+----------+--------+-------------+------------------+----------+--------+
|     10012|ACCT-211|            1|MWF 8:00-8:50 a.m.|    BUS311|     105|
|     10013|ACCT-211|            2|MWF 9:00-9:50 a.m.|    BUS200|     105|
|     10014|ACCT-211|            3|TTh 2:30-3:45 p.m.|    BUS252|     342|
+----------+--------+-------------+------------------+----------+--------+

