In [0]:
# IGNORE THIS IF YOU ALREADY HAVE a MOUNT POINT TO ACCESS YOUR DATA
blob_storage_name="<STORAGE_ACCOUNT_NAME>"
container_name="<CONTAINER_NAME>"
blob_storage_key="<ACCESS_KEY>"
spark.conf.set(f"fs.azure.account.key.{blob_storage_name}.blob.core.windows.net", blob_storage_key)
mount_point = f"/mnt/{container_name}"
source = f"wasbs://{container_name}@{blob_storage_name}.blob.core.windows.net"

if not mount_point in [m.mountPoint for m in dbutils.fs.mounts()]:
  dbutils.fs.mount(source = source,
                   mount_point = mount_point,
                   extra_configs = {f"fs.azure.account.key.{blob_storage_name}.blob.core.windows.net" : blob_storage_key})
else:
  print("Already mounted")


In [0]:
from pyspark.sql.functions import col,count,avg,max,min,stddev
import numpy as np
import pandas as pd
#Please set the number of shuffle partitions on the best setting for your environment 
spark.conf.set('spark.sql.shuffle.partitions',64)

#Analysis of a real dataset

The aim of this notebooks is to show how we can use several approaches to generate a synthetic dataset using pyspark leveraging on the characteristics of an existing dataset, looking at its statistics and/or following other requirements.

To try this approacches we will start from an example dataset downloaded from kaggle [Churn for Bank Customers](https://www.kaggle.com/mathchi/churn-for-bank-customers)

Let's start with the exploration!

In [0]:
# Reading the file from dbfs
source_df = spark.read.option("quote", "\"").option('header',True).csv("/mnt/fake/churn.csv")

In [0]:
source_size = source_df.count()
print(f"Number of documents on the source dataset: {source_size:,}")

We print the schema

In [0]:
source_df.printSchema()

Let's have a look at the actual data

In [0]:
display(source_df)

RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1
9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0
10,15592389,H?,684,France,Male,27,2,134603.88,1,1,1,71725.73,0


In [0]:
source_df = (source_df.withColumn('creditScore',col('creditScore').cast('int'))
                      .withColumn('balance',col('balance').cast('float'))
                      .withColumn('EstimatedSalary',col('EstimatedSalary').cast('float'))
                      .withColumn('age',col('age').cast('int')))

### Gender

In [0]:
display(source_df)

RowNumber,CustomerId,Surname,creditScore,Geography,Gender,age,Tenure,balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0
6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.7,1
7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0
8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1
9,15792365,He,501,France,Male,44,4,142051.06,2,0,1,74940.5,0
10,15592389,H?,684,France,Male,27,2,134603.88,1,1,1,71725.73,0


In [0]:
geo_df = source_df.select('gender').groupBy('gender').agg((count('gender')/source_size).alias('prob'))
results = geo_df.collect()
gender_labels = [x['gender'] for x in results ]
gender_prob = [x['prob'] for x in results ]
print(gender_labels)
print(gender_prob)

### Geography

In [0]:
display(source_df.select('geography'))

geography
France
Spain
France
France
Spain
Spain
France
Germany
France
France


In [0]:
from pyspark.sql.functions import count
geo_df = source_df.select('geography').groupBy('geography').agg((count('geography')/source_size).alias('prob'))
results = geo_df.collect()
geography_labels = [x['geography'] for x in results ]
geography_prob = [x['prob'] for x in results ]
print(geography_labels)
print(geography_prob)

### Credit Score

In [0]:
display(source_df.select(col('creditScore').cast('int')).orderBy('creditScore'))

creditScore
350
350
350
350
350
351
358
359
363
365


In [0]:
source_df.printSchema()

In [0]:
display(source_df.select(min('creditScore').alias('min'),
                         max('creditScore').alias('max'),
                         avg('creditScore').alias('mean'),
                         stddev('creditScore').alias('std')))

min,max,mean,std
350,850,650.5288,96.65329873613037


In [0]:
results = source_df.select(min('creditScore').alias('min'),
                         max('creditScore').alias('max'),
                         avg('creditScore').alias('mean'),
                         stddev('creditScore').alias('std')).collect()
min_credit_score=int(results[0]['min'])
max_credit_score=int(results[0]['max'])
mean_credit_score=int(results[0]['mean'])
std_credit_score=int(results[0]['std'])
print(f"Minimum: {min_credit_score:,}, Maximum: {max_credit_score:,}, Mean: {mean_credit_score:,}, Std: {std_credit_score:,}")

Let's try to generate a similar distribution using numpy

In [0]:
size = 10000
# generate random integer
arr = np.random.normal(mean_credit_score, std_credit_score, size).astype(int)
# clip respecting boundaries
arr = np.clip(arr, min_credit_score, max_credit_score)  
fake_credit_score = spark.createDataFrame(pd.DataFrame({'fake_credit_score':arr}))
display(fake_credit_score)

fake_credit_score
481
562
692
537
744
850
643
716
530
682


### Balance

In [0]:
from pyspark.sql.functions import col
display(source_df.select(col('balance').cast('int')))

balance
0
83807
159660
0
125510
113755
0
115046
142051
134603


In [0]:
results = source_df.select(min('balance').alias('min'),
                         max('balance').alias('max'),
                         avg('balance').alias('mean'),
                         stddev('balance').alias('std')).collect()
min_balance_score=int(results[0]['min'])
max_balance_score=int(results[0]['max'])
mean_balance_score=int(results[0]['mean'])
std_balance_score=int(results[0]['std'])
print(f"Minimum: {min_balance_score:,}, Maximum: {max_balance_score:,}, Mean: {mean_balance_score:,}, Std: {std_balance_score:,}")

In [0]:
size = 10000
l=1
arr = np.random.poisson(l, size)
arr = arr * np.random.uniform(low = 0.0, high = 1.0, size = size)  
arr = arr * mean_balance_score
arr = np.round(arr, 2)      
df = pd.DataFrame({'balance':arr})
display(spark.createDataFrame(df))

balance
52218.16
6666.97
67949.73
0.0
0.0
0.0
38842.37
46219.45
69370.87
0.0


### Number of products

In [0]:
display(source_df.select(col('NumOfProducts')))

NumOfProducts
1
1
3
2
1
2
2
4
2
1


In [0]:
results = source_df.select(col('NumOfProducts')).groupBy('NumOfProducts').agg((count('NumOfProducts')/source_size).alias('prob')).collect()
num_of_products_labels = [x['NumOfProducts'] for x in results ]
num_of_products_prob = [x['prob'] for x in results ]
print(num_of_products_labels)
print(num_of_products_prob)

Estimated Salary

In [0]:
display(source_df.select(col('estimatedSalary').cast('int')).orderBy('estimatedSalary'))

estimatedSalary
11
90
91
96
106
123
142
143
178
216


In [0]:
display(source_df.select(min(col('estimatedSalary').cast('float')).alias('min_estimated_salary'),
                         max(col('estimatedSalary').cast('float')).alias('max_estimated_salary')))

min_estimated_salary,max_estimated_salary
11.58,199992.48


In [0]:
results = source_df.select(min(col('estimatedSalary').cast('float')).alias('min_estimated_salary'),max(col('estimatedSalary').cast('float')).alias('max_estimated_salary')).collect()
min_estimated_salary = results[0]['min_estimated_salary']
max_estimated_salary = results[0]['max_estimated_salary']
print(f"{max_estimated_salary:,}")

In [0]:
size= 10000
arr = np.random.uniform(low = 0.0, high = 1.0, size = size)  
arr = arr * max_estimated_salary
arr = np.round(arr, 2)      
df = pd.DataFrame({'estimated_salary':arr})
display(spark.createDataFrame(df))

estimated_salary
117075.3
173672.99
111354.09
166294.83
54997.72
134297.01
76942.25
102822.87
37213.1
39492.62


### Age

In [0]:
from pyspark.sql.functions import min,max
display(source_df.select(min('Age'),max('Age')))

min(Age),max(Age)
18,92


In [0]:
results = source_df.select(min('Age').alias('min_age'),max('Age').alias('max_age')).collect()
min_age,max_age=int(results[0]['min_age']),int(results[0]['max_age'])
print("Min age:",min_age)
print("Max age:",max_age)

### Exited

In [0]:
display(source_df.select('exited'))

exited
1
0
1
0
0
1
0
1
0
0


In [0]:
from pyspark.sql.functions import count
geo_df = source_df.select('exited').groupBy('exited').agg((count('exited')/source_size).alias('prob'))
results = geo_df.collect()
exited_labels = [x['exited'] for x in results ]
exited_prob = [x['prob'] for x in results ]
print('Exited Labels:',exited_labels)
print('Exited Probabilities:',exited_prob)

#Intoducing fake generator libraries

### pydbgen

In [0]:
from pydbgen import pydbgen
faker = pydbgen.pydb()

In [0]:
faker.fake.name() 

In [0]:
faker.fake.email() 

In [0]:
faker.fake.address()

In [0]:
faker.fake.company()

In [0]:
faker.simple_ph_num()

### Mimesis

In [0]:
from mimesis import Person
person = Person('en')
person.full_name()

In [0]:
person.email(domains=['mimesis.name'])

In [0]:
person.email(domains=['mimesis.name'], unique=True)

In [0]:
person.telephone(mask='1-4##-8##-5##3')

In [0]:
from mimesis import Business
business = Business()
business.company()

# Let's make it Spark™
In this section we are going to use pyspark pandas_udf to implement functions to generate new random data based on the evidences we collected in the previous section and using the fake data generator library mimesis

In [0]:
from pyspark.sql.functions import pandas_udf
from typing import Iterator
import pandas as pd

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType
from pyspark.sql.functions import col

NUMBER_OF_DOCUMENTS = 10**8
n_partitions = 16

from pyspark.mllib.random import RandomRDDs

from pyspark.sql.functions import lit, monotonically_increasing_id

rdd = RandomRDDs.uniformRDD(sc, NUMBER_OF_DOCUMENTS, n_partitions ).map(lambda x: (0,) )

fake_dataset_schema = schema = StructType([ 
    StructField("id",IntegerType(),False)
  ])

fake_df = spark.createDataFrame(rdd,schema=fake_dataset_schema)
fake_df = fake_df.select(monotonically_increasing_id().alias('id'))

### Generating fake personal information

In [0]:
from mimesis import Person

@pandas_udf('string')
def gen_fake_surname(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
  p = Person()
  for s in iterator:
    yield s.apply(lambda x: p.surname())
    
fake_df = fake_df.withColumn('name',gen_fake_surname(col('id')))

In [0]:
@pandas_udf('string')
def gen_fake_email(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
  p = Person()
  for s in iterator:
    yield s.apply(lambda x: p.email())
    
fake_df = fake_df.withColumn('email',gen_fake_email(col('id')))

In [0]:
from mimesis import Address

@pandas_udf('string')
def gen_fake_address(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    a = Address()
    for s in iterator:
      yield s.apply(lambda x: a.address())

fake_df = fake_df.withColumn('address',gen_fake_address(col('id')))

### Generating a fake age

In [0]:
def udf_gen_fake_age(min_age,max_age):
  @pandas_udf('int')
  def gen_fake_age(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    p = Person()
    for s in iterator:
      # Use that state for whole iterator.
      yield s.apply(lambda x: p.age(min_age,max_age))
  return gen_fake_age

fake_df = fake_df.withColumn('age',udf_gen_fake_age(min_age,max_age)(col('id')))

### Balance, credit score and estimated salary

In [0]:
import numpy as np

def udf_gen_fake_credit_score(min_credit_score:int=350, 
                              max_credit_score:int=850,
                              mean:int=500,
                              std:int=50 ):
  @pandas_udf('float')
  def gen_fake_credit_score(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
      for s in iterator:
        size = s.shape[0]
        # generate random integer
        arr = np.random.normal(mean, std, size).astype(int)
        # clip respecting boundaries
        arr = np.clip(arr, min_credit_score, max_credit_score)                                       
        yield pd.Series(arr) 

  return gen_fake_credit_score

fake_df = fake_df.withColumn('credit_score',udf_gen_fake_credit_score(min_credit_score,
                                                                      max_credit_score,
                                                                      mean_credit_score,
                                                                      std_credit_score)(col('id')))

In [0]:
def udf_gen_fake_balance(l:int=2, factor:int=1000):
  
  @pandas_udf('float')
  def gen_fake_balance(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
      for s in iterator:
        size = s.shape[0]
        arr = np.random.poisson(l, size)                                   # generate numbers from a poisson distribution
        arr = arr * np.random.uniform(low = 0.0, high = 1.0, size = size)  # add some noise
        arr = arr * factor                                                 # increase the scale by a factor
        arr = np.round(arr, 2)                                             # round it to 2 decimal places
        yield pd.Series(arr)

  return gen_fake_balance

fake_df = fake_df.withColumn('balance',udf_gen_fake_balance(1,mean_balance_score)(col('id')))


In [0]:
def udf_gen_fake_estimated_salary(max_salary:float=200000):
  
  @pandas_udf('float')
  def gen_fake_estimated_salary(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
      for s in iterator:
        size = s.shape[0]
        
        arr = np.random.uniform(low = 0.0, high = 1.0, size = size)  
        arr = arr * max_salary                                                
        arr = np.round(arr, 2)                                             
        yield pd.Series(arr)

  return gen_fake_estimated_salary

fake_df = fake_df.withColumn('estimated_salary',udf_gen_fake_estimated_salary(max_estimated_salary)(col('id')))

In [0]:

def udf_gen_fake_category(categories, probabilities):
  @pandas_udf('string')
  def gen_fake_category(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
     for s in iterator:
        size = s.shape[0]
        arr = np.random.choice(categories, size = size, p = probabilities)
        yield pd.Series(arr)

  return gen_fake_category


fake_df = fake_df.withColumn('geography',udf_gen_fake_category(geography_labels,geography_prob)(col('id')))
fake_df = fake_df.withColumn('num_of_products',udf_gen_fake_category(num_of_products_labels,num_of_products_prob)(col('id')).cast('int'))
fake_df = fake_df.withColumn('gender',udf_gen_fake_category(gender_labels,gender_prob)(col('id')))
fake_df = fake_df.withColumn('exited',udf_gen_fake_category(exited_labels,exited_prob)(col('id')))

In [0]:
fake_df.write.mode('overwrite').parquet("/mnt/fake/generated/")

In [0]:
print(f"Number of documents: {fake_df.count():,}")