<a href="https://colab.research.google.com/github/manhbd-22022602/big_data_for_beginners/blob/main/generate_data_with_Faker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://github.com/groda/big_data"><div><img src="https://github.com/groda/big_data/blob/master/logo_bdb.png?raw=true" align=right width="90"></div></a>

# Data Generation and Aggregation with Python's Faker Library and PySpark
<br>
<br>

Explore the capabilities of the Python Faker library (https://faker.readthedocs.io/) for dynamic data generation!

Whether you're a data scientist, engineer, or analyst, this tutorial will guide you through the process of creating realistic and diverse datasets using Faker and then harnessing the distributed computing capabilities of PySpark to aggregate and analyze the generated data.



**Note:** This is not _synthetic_ data as it is generated using simple methods and will most likely not fit any real-life distribution. Still, it serves as a valuable resource for testing purposes when authentic data is unavailable.

# Install Faker

The Python `faker` module needs to be installed. Note that on Google Colab you can use `!pip` as well as just `pip` (no exclamation mark).

In [37]:
!pip install faker



# Generate a Pandas dataframe with fake data

Import `Faker` and set a random seed ($42$).

In [38]:
from faker import Faker
# Set the seed value of the shared `random.Random` object
# across all internal generators that will ever be created
Faker.seed(42)

`fake` is a fake data generator with `DE_de` locale.

In [39]:
fake = Faker('de_DE')
fake.seed_locale('de_DE', 42)
# Creates and seeds a unique `random.Random` object for
# each internal generator of this `Faker` instance
fake.seed_instance(42)

Import Pandas to save data into a dataframe

In [40]:
# true if running on Google Colab
import sys
IN_COLAB = 'google.colab' in sys.modules
if not IN_COLAB:
 !pip install pandas==1.5.3

import pandas as pd

The function `create_row_faker` creates one row of fake data. Here we choose to generate a row containing the following fields:
 - `fake.name()`
 - `fake.postcode()`
 - `fake.email()`
 - `fake.country()`.

In [41]:
def create_row_faker(num=1):
    output = [{"name": fake.name(),
               "age": fake.random_int(0, 100),
               "postcode": fake.postcode(),
               "email": fake.email(),
               "nationality": fake.country(),
              } for x in range(num)]
    return output

Generate a single row

In [42]:
create_row_faker()

[{'name': 'Aleksandr Weihmann',
  'age': 35,
  'postcode': '32181',
  'email': 'bbeckmann@example.org',
  'nationality': 'Fidschi'}]

Generate a dataframe `df_fake` of 5000 rows using `create_row_faker`.

We're using the _cell magic_ `%%time` to time the operation.

In [43]:
%%time
df_fake = pd.DataFrame(create_row_faker(5000))

CPU times: user 616 ms, sys: 8.62 ms, total: 625 ms
Wall time: 949 ms


View dataframe

In [44]:
df_fake

Unnamed: 0,name,age,postcode,email,nationality
0,Prof. Kurt Bauer B.A.,91,37940,hildaloechel@example.com,Guatemala
1,Ekkehart Wiek-Kallert,13,61559,maja07@example.net,Brasilien
2,Annelise Rohleder-Hornig,80,93103,daniel31@example.com,Guatemala
3,Magrit Knappe B.A.,47,34192,gottliebmisicher@example.com,Guadeloupe
4,Univ.Prof. Gotthilf Wilmsen B.Sc.,29,56413,heini76@example.net,Litauen
...,...,...,...,...,...
4995,Herr Arno Ebert B.A.,63,36790,josefaebert@example.org,Slowenien
4996,Miroslawa Schüler,22,11118,ruppersbergerbetina@example.org,Republik Moldau
4997,Janusz Nerger,74,33091,ann-kathrinseip@example.net,Belarus
4998,Frau Cathleen Bähr,97,89681,hethurhubertus@example.org,St. Barthélemy


For more fake data generators see Faker's [standard providers](https://faker.readthedocs.io/en/master/providers.html#standard-providers) as well as [community providers](https://faker.readthedocs.io/en/master/communityproviders.html#community-providers).

# Generate PySpark dataframe with fake data

Install PySpark.

In [45]:
!pip install pyspark



In [46]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Faker demo") \
    .getOrCreate()

In [47]:
df = spark.createDataFrame(create_row_faker(5000))

To avoid getting the warning, either use [pyspark.sql.Row](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row) and let Spark infer datatypes or create a schema for the dataframe specifying the datatypes of all fields (here's the list of all [datatypes](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=types#module-pyspark.sql.types)).

In [48]:
from pyspark.sql.types import *
schema = StructType([StructField('name', StringType()),
                     StructField('age',IntegerType()),
                     StructField('postcode',StringType()),
                     StructField('email', StringType()),
                     StructField('nationality',StringType())])

In [49]:
df = spark.createDataFrame(create_row_faker(5000), schema)

In [50]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- postcode: string (nullable = true)
 |-- email: string (nullable = true)
 |-- nationality: string (nullable = true)



Let's generate some more data (dataframe with $5\cdot10^4$ rows). The file will be partitioned by Spark.

In [51]:
%%time
n = 5*10**4
df = spark.createDataFrame(create_row_faker(n), schema)

CPU times: user 4.2 s, sys: 29.2 ms, total: 4.23 s
Wall time: 4.47 s


It took a long time (~4 sec. for 50000 rows)!

Can we do better?

The function `create_row_faker()` returns a list. This is not efficient, what we need is a _generator_ instead.

In [52]:
d = create_row_faker(5)
# what type is d?
type(d)

list

Now `d` is a generator

In [53]:
d = ({"name": fake.name(),
      "age": fake.random_int(0, 100),
      "postcode": fake.postcode(),
      "email": fake.email(),
      "nationality": fake.country()} for i in range(5))
# what type is d?
type(d)

generator

In [54]:
%%time
n = 5*10**4
d = ({"name": fake.name(),
      "age": fake.random_int(0, 100),
      "postcode": fake.postcode(),
      "email": fake.email(),
      "nationality": fake.country()}
     for i in range(n))
df = spark.createDataFrame(d, schema)

CPU times: user 6.25 s, sys: 48.6 ms, total: 6.3 s
Wall time: 9.67 s


This wasn't faster.

I will look into how one can leverage Hadoop's parallelism to generate dataframes and speed the process.

## Filter and aggregate with PySpark

In [55]:
type(df)

Show the first five records in the dataframe of fake data.

In [56]:
df.show(n=5, truncate=False)

+-----------------------+---+--------+--------------------------+----------------------------+
|name                   |age|postcode|email                     |nationality                 |
+-----------------------+---+--------+--------------------------+----------------------------+
|Hans-Willi Seidel B.Sc.|57 |22848   |alangern@example.net      |Neukaledonien               |
|Virginia Karz-Holt     |97 |56097   |susannnohlmans@example.net|Cookinseln                  |
|Canan Mosemann         |54 |40881   |klaus-d60@example.net     |Usbekistan                  |
|Friedhilde Mude        |18 |60167   |hilma52@example.org       |Zentralafrikanische Republik|
|Agata Sauer-Etzler     |35 |21379   |barkholzhenri@example.com |Kasachstan                  |
+-----------------------+---+--------+--------------------------+----------------------------+
only showing top 5 rows



Do some data aggregation:
 - group by postcode
 - count the number of persons and the average age for each postcode
 - filter out postcodes with less than 4 persons
 - sort by average age descending
 - show the first 5 entries

In [57]:
import pyspark.sql.functions as F
df.groupBy('postcode') \
  .agg(F.count('postcode').alias('Count'), F.round(F.avg('age'), 2).alias('Average age')) \
  .filter('Count>3') \
  .orderBy('Average age', ascending=False) \
  .show(5)

+--------+-----+-----------+
|postcode|Count|Average age|
+--------+-----+-----------+
|   18029|    4|      91.75|
|   67611|    4|       87.0|
|   47898|    4|       85.5|
|   46755|    4|       78.5|
|   84546|    4|       77.0|
+--------+-----+-----------+
only showing top 5 rows



Postcode $18029$ has the highest average age ($91.75$). Show all entries for postcode $18029$ using `filter`.

In [58]:
df.filter('postcode==18029').show(truncate=False)

+---------------------------+---+--------+-----------------------------+------------------+
|name                       |age|postcode|email                        |nationality       |
+---------------------------+---+--------+-----------------------------+------------------+
|Univ.Prof. Roderich Lehmann|89 |18029   |anne-katrinscholl@example.com|Grönland          |
|Herwig Martin B.A.         |90 |18029   |steckelgerta@example.com     |Amerikanisch-Samoa|
|Univ.Prof. Mijo Walter     |92 |18029   |hoevelantonius@example.net   |Niederlande       |
|Aynur Karz B.Eng.          |96 |18029   |cschleich@example.com        |Puerto Rico       |
+---------------------------+---+--------+-----------------------------+------------------+



# Another example with multiple locales and weights

We are going to use multiple locales with weights (following the [examples](https://faker.readthedocs.io/en/master/fakerclass.html#examples) in the documentation).

Here's the [list of all available locales](https://faker.readthedocs.io/en/master/locales.html).

In [59]:
from faker import Faker
# set a seed for the random generator
Faker.seed(0)

Generate data with locales `de_DE` and `de_AT` with weights respectively $5$ and $2$.

The distribution of locales will be:
 - `de_DE` - $71.43\%$ of the time ($5 / (5+2)$)
 - `de_AT` - $28.57\%$ of the time ($2 / (5+2)$)


In [60]:
from collections import OrderedDict
locales = OrderedDict([
    ('de_DE', 5),
    ('de_AT', 2),
])
fake = Faker(locales)
fake.seed_instance(42)
fake.locales

['de_DE', 'de_AT']

In [61]:
fake.seed_locale('de_DE', 0)
fake.seed_locale('de_AT', 0)

In [62]:
fake.profile(fields=['name', 'birthdate', 'sex', 'blood_group',
                     'mail', 'current_location'])

{'current_location': (Decimal('26.547114'), Decimal('-10.243190')),
 'blood_group': 'B-',
 'name': 'Axel Jung',
 'sex': 'M',
 'mail': 'claragollner@gmail.com',
 'birthdate': datetime.date(2003, 8, 8)}

In [63]:
from pyspark.sql.types import *
location = StructField('current_location',
                       StructType([StructField('lat', DecimalType()),
                                   StructField('lon', DecimalType())])
                      )
schema = StructType([StructField('name', StringType()),
                     StructField('birthdate', DateType()),
                     StructField('sex', StringType()),
                     StructField('blood_group', StringType()),
                     StructField('mail', StringType()),
                     location
                     ])

In [64]:
fake.profile(fields=['name', 'birthdate', 'sex', 'blood_group',
                     'mail', 'current_location'])

{'current_location': (Decimal('79.153888'), Decimal('-0.003034')),
 'blood_group': 'B-',
 'name': 'Dr. Anita Suppan',
 'sex': 'F',
 'mail': 'schauerbenedict@kabsi.at',
 'birthdate': datetime.date(1980, 4, 20)}

In [65]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Faker demo - part 2") \
    .getOrCreate()

Create dataframe with $5\cdot10^3$ rows.

In [66]:
%%time
n = 5*10**3
d = (fake.profile(fields=['name', 'birthdate', 'sex', 'blood_group',
                          'mail', 'current_location'])
     for i in range(n))
df = spark.createDataFrame(d, schema)

CPU times: user 3.07 s, sys: 10.9 ms, total: 3.08 s
Wall time: 3.58 s


In [67]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- sex: string (nullable = true)
 |-- blood_group: string (nullable = true)
 |-- mail: string (nullable = true)
 |-- current_location: struct (nullable = true)
 |    |-- lat: decimal(10,0) (nullable = true)
 |    |-- lon: decimal(10,0) (nullable = true)



Note how `location` represents a _tuple_ data structure (a `StructType` of `StructField`s).

In [68]:
df.show(n=10, truncate=False)

+---------------------------+----------+---+-----------+-------------------------+----------------+
|name                       |birthdate |sex|blood_group|mail                     |current_location|
+---------------------------+----------+---+-----------+-------------------------+----------------+
|Prof. Valentine Niemeier   |1979-05-24|F  |B-         |maricagotthard@aol.de    |{74, 164}       |
|Magrit Graf                |1943-03-27|F  |A-         |hartungclaudio@web.de    |{-86, -34}      |
|Harriet Weiß-Liebelt       |1960-02-03|F  |AB+        |heserhilma@gmail.com     |{20, 126}       |
|Marisa Heser               |1919-03-08|F  |B-         |meinhard55@web.de        |{73, 169}       |
|Alexa Loidl-Schönberger    |1934-03-11|F  |O-         |hannafroehlich@gmail.com |{-23, -117}     |
|Rosa-Maria Schwital B.Sc.  |1927-08-20|F  |O-         |johannessauer@yahoo.de   |{2, -113}       |
|Herr Roland Caspar B.Sc.   |1932-03-21|M  |O-         |weinholdslawomir@yahoo.de|{24, 100}       |


# Save to Parquet

[Write to parquet](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=write#pyspark.sql.DataFrameWriter.parquet) file ([Parquet](http://parquet.apache.org/) is a compressed, efficient columnar data representation compatible with all frameworks in the Hadoop ecosystem):

In [69]:
df.write.mode("overwrite").parquet("fakedata.parquet")

Check the size of parquet file (it is actually a directory containing the partitions):

In [70]:
!du -h fakedata.parquet

188K	fakedata.parquet


In [71]:
!ls -lh fakedata.parquet

total 172K
-rw-r--r-- 1 root root 71K Apr 24 02:08 part-00000-b28115e0-4f0e-4da3-8c78-ee511cd3b34e-c000.snappy.parquet
-rw-r--r-- 1 root root 98K Apr 24 02:08 part-00001-b28115e0-4f0e-4da3-8c78-ee511cd3b34e-c000.snappy.parquet
-rw-r--r-- 1 root root   0 Apr 24 02:08 _SUCCESS


# Stop Spark session

Don't forget to close the Spark session when you're done!

In [72]:
spark.stop()