In [2]:
#****************************************************************************
# (C) Cloudera, Inc. 2020-2023
#  All rights reserved.
#
#  Applicable Open Source License: GNU Affero General Public License v3.0
#
#  NOTE: Cloudera open source products are modular software products
#  made up of hundreds of individual components, each of which was
#  individually copyrighted.  Each Cloudera open source product is a
#  collective work under U.S. Copyright Law. Your license to use the
#  collective work is as provided in your written agreement with
#  Cloudera.  Used apart from the collective work, this file is
#  licensed for your use pursuant to the open source license
#  identified above.
#
#  This code is provided to you pursuant a written agreement with
#  (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
#  this code. If you do not have a written agreement with Cloudera nor
#  with an authorized and properly licensed third party, you do not
#  have any rights to access nor to use this code.
#
#  Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
#  contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
#  KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
#  WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
#  IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
#  FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
#  AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
#  ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
#  OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
#  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
#  CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
#  RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
#  BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
#  DATA.
#
# #  Author(s): Paul de Fusco
#***************************************************************************/

In [1]:
import os
import numpy as np
import pandas as pd
from datetime import datetime
import dbldatagen as dg
import dbldatagen.distributions as dist
from dbldatagen import FakerTextFactory, DataGenerator, fakerText
from faker.providers import bank, credit_card, currency
from pyspark.sql.types import LongType, FloatType, IntegerType, StringType, \
                              DoubleType, BooleanType, ShortType, \
                              TimestampType, DateType, DecimalType, \
                              ByteType, BinaryType, ArrayType, MapType, \
                              StructType, StructField

class BankDataGen:

    '''Class to Generate Banking Data'''

    def __init__(self, spark):
        self.spark = spark

    def transactionsDataGen(self, shuffle_partitions_requested = 10, partitions_requested = 10, data_rows = 1000000):

        # setup use of Faker
        FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank])

        # partition parameters etc.
        self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

        fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested, randomSeed=42)
                    .withColumn("credit_card_number", "int", minValue=12345678901234, maxValue=-123456789012340, step=1)
                    .withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") )
                    .withColumn("transaction_type", "string", values=["purchase", "cash_advance"], random=True, weights=[9, 1])
                    .withColumn("event_ts", "timestamp", begin="2023-01-01 01:00:00",end="2023-12-31 23:59:00",interval="1 minute", random=True)
                    .withColumn("longitude", "float", minValue=-125, maxValue=-66.9345, random=True)
                    .withColumn("latitude", "float", minValue=24.3963, maxValue=49.3843, random=True)
                    .withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"])
                    .withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True)
                    )

        df = fakerDataspec.build()

        df = df.withColumn('credit_card_number', df['credit_card_number'].cast(StringType()))

        return df


    def piiDataGen(self, shuffle_partitions_requested = 10, partitions_requested = 10, data_rows = 10000):

        # setup use of Faker
        FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank])

        # partition parameters etc.
        self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

        fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested, randomSeed=42)
                    .withColumn("name", percentNulls=0.1, text=FakerTextUS("name") )
                    .withColumn("address", text=FakerTextUS("address" ))
                    .withColumn("email", text=FakerTextUS("ascii_company_email") )
                    .withColumn("aba_routing", text=FakerTextUS("aba" ))
                    .withColumn("bank_country", text=FakerTextUS("bank_country") )
                    .withColumn("account_no", text=FakerTextUS("bban" ))
                    .withColumn("int_account_no", text=FakerTextUS("iban") )
                    .withColumn("swift11", text=FakerTextUS("swift11" ))
                    .withColumn("credit_card_number", "int", minValue=12345678901234, maxValue=-123456789012340, step=1)
                    )

        df = fakerDataspec.build()

        df = df.withColumn('credit_card_number', df['credit_card_number'].cast(StringType()))


        return df


    def sameCardDataGen(self, shuffle_partitions_requested = 10, partitions_requested = 10, data_rows = 10000000):

        # setup use of Faker
        FakerTextUS = FakerTextFactory(locale=['en_US'], providers=[bank])

        # partition parameters etc.
        self.spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

        fakerDataspec = (DataGenerator(self.spark, rows=data_rows, partitions=partitions_requested, randomSeed=42)
                    .withColumn("credit_card_number", "string", values=["12345678901234"])
                    .withColumn("credit_card_provider", text=FakerTextUS("credit_card_provider") )
                    .withColumn("transaction_type", "string", values=["purchase", "cash_advance"], random=True, weights=[9, 1])
                    .withColumn("event_ts", "timestamp", begin="2023-01-01 01:00:00",end="2023-12-31 23:59:00",interval="1 minute", random=True)
                    .withColumn("longitude", "float", minValue=-125, maxValue=-66.9345, random=True)
                    .withColumn("latitude", "float", minValue=24.3963, maxValue=49.3843, random=True)
                    .withColumn("transaction_currency", values=["USD", "EUR", "KWD", "BHD", "GBP", "CHF", "MEX"])
                    .withColumn("transaction_amount", "decimal", minValue=0.01, maxValue=30000, random=True)
                    )

        df = fakerDataspec.build()

        #df = df.withColumn('credit_card_number', df['credit_card_number'].cast(StringType()))

        return df
        

In [2]:
import cml.data_v1 as cmldata

# Sample in-code customization of spark configurations
#from pyspark import SparkContext
#SparkContext.setSystemProperty('spark.executor.cores', '1')
#SparkContext.setSystemProperty('spark.executor.memory', '2g')

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

myDG = BankDataGen(spark)

sameCardDf = myDG.sameCardDataGen()
transactionsDf = myDG.transactionsDataGen()

allTransDf = sameCardDf.union(transactionsDf)

allTransDf.write.format("parquet").mode("overwrite").save("s3a://go01-demo/datalake/pdefusco/transactions/")

piiDf = myDG.piiDataGen()
piiDf.write.format("csv").option("header",True).mode("overwrite").save("s3a://go01-demo/datalake/pdefusco/pii/piiData.csv")

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco


Spark Application Id:spark-1814a99a5fd644f79aece1773c8b0471


                                                                                

In [4]:
#allTransDf.count()

In [5]:
allTransDf.groupBy("credit_card_number").count().orderBy("credit_card_number").show()



+------------------+--------+
|credit_card_number|   count|
+------------------+--------+
|    12345678901234|10000000|
|        1942892530|       1|
|        2045911182|       1|
|        2045911183|       1|
|        2045911184|       1|
|        2045911185|       1|
|        2045911186|       1|
|        2045911187|       1|
|        2045911188|       1|
|        2045911189|       1|
|        2045911190|       1|
|        2045911191|       1|
|        2045911192|       1|
|        2045911193|       1|
|        2045911194|       1|
|        2045911195|       1|
|        2045911196|       1|
|        2045911197|       1|
|        2045911198|       1|
|        2045911199|       1|
+------------------+--------+
only showing top 20 rows



                                                                                

In [6]:
# Inner join
transactionsDf.join(piiDf,transactionsDf.credit_card_number ==  piiDf.credit_card_number,"inner").show(truncate=False)

                                                                                

+------------------+---------------------------+----------------+-------------------+---------+--------+--------------------+------------------+-----------------+-----------------------------------------------------------+-------------------------------+-----------+------------+------------------+----------------------+-----------+------------------+
|credit_card_number|credit_card_provider       |transaction_type|event_ts           |longitude|latitude|transaction_currency|transaction_amount|name             |address                                                    |email                          |aba_routing|bank_country|account_no        |int_account_no        |swift11    |credit_card_number|
+------------------+---------------------------+----------------+-------------------+---------+--------+--------------------+------------------+-----------------+-----------------------------------------------------------+-------------------------------+-----------+------------+---------------

                                                                                

In [7]:
df_txn_details = (
    allTransDf.join(
        piiDf,
        on="credit_card_number",
        how="inner"
    )
)

In [8]:
import time
start_time = time.time()
df_txn_details.count()
print(f"time taken: {time.time() - start_time}")

time taken: 1.1461257934570312
