# Generate Skewed Data 

## Import Modules

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import udf
from pyspark.sql.types import *

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import random
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

In [2]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", "8g")
    .config("spark.driver.cores", "4")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("ERROR")

23/06/30 15:38:02 WARN Utils: Your hostname, Afaques-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.2 instead (on interface en0)
23/06/30 15:38:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/06/30 15:38:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/30 15:38:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Setting Up Transactions Data

In [3]:
customers_file = "../../data/data_skew/raw_customers.csv"
txns_file = "../../data/data_skew/raw_transactions.csv"

In [4]:
df_raw_txns = spark.read.csv(txns_file, header=True)

                                                                                

In [5]:
df_raw_txns.printSchema()
df_raw_txns.show(3, False)

root
 |-- CUST_ID: string (nullable = true)
 |-- START_DATE: string (nullable = true)
 |-- END_DATE: string (nullable = true)
 |-- TRANS_ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- EXP_TYPE: string (nullable = true)
 |-- AMOUNT: string (nullable = true)

+----------+----------+--------+---------------+----------+----+-----+---+------------+------+
|CUST_ID   |START_DATE|END_DATE|TRANS_ID       |DATE      |YEAR|MONTH|DAY|EXP_TYPE    |AMOUNT|
+----------+----------+--------+---------------+----------+----+-----+---+------------+------+
|CI6XLYUMQK|2015-05-01|null    |T8I9ZB5A6X90UG8|2015-09-11|2015|9    |11 |Motor/Travel|20.27 |
|CI6XLYUMQK|2015-05-01|null    |TZ4JSLS7SC7FO9H|2017-02-08|2017|2    |8  |Motor/Travel|12.85 |
|CI6XLYUMQK|2015-05-01|null    |TTUKRDDJ6B6F42H|2015-08-01|2015|8    |1  |Housing     |383.8 |
+----------+----------+--------+---------

In [6]:
df_txns = (
    df_raw_txns.withColumnRenamed("CUST_ID", "cust_id")
    .withColumnRenamed("START_DATE", "start_date")
    .withColumnRenamed("END_DATE", "end_date")
    .withColumnRenamed("TRANS_ID", "txn_id")
    .withColumnRenamed("DATE", "date")
    .withColumnRenamed("YEAR", "year")
    .withColumnRenamed("MONTH", "month")
    .withColumnRenamed("DAY", "day")
    .withColumnRenamed("EXP_TYPE", "expense_type")
    .withColumnRenamed("AMOUNT", "amt")
)

In [7]:
df_txns.printSchema()
df_txns.show(3, False)

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)

+----------+----------+--------+---------------+----------+----+-----+---+------------+-----+
|cust_id   |start_date|end_date|txn_id         |date      |year|month|day|expense_type|amt  |
+----------+----------+--------+---------------+----------+----+-----+---+------------+-----+
|CI6XLYUMQK|2015-05-01|null    |T8I9ZB5A6X90UG8|2015-09-11|2015|9    |11 |Motor/Travel|20.27|
|CI6XLYUMQK|2015-05-01|null    |TZ4JSLS7SC7FO9H|2017-02-08|2017|2    |8  |Motor/Travel|12.85|
|CI6XLYUMQK|2015-05-01|null    |TTUKRDDJ6B6F42H|2015-08-01|2015|8    |1  |Housing     |383.8|
+----------+----------+--------+---------------+

# Setting Up Customer Data

In [8]:
df_customer_det = spark.read.csv(customers_file, header=True)

In [9]:
df_customer_det.printSchema()
df_customer_det.show(3, False)

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- zip: string (nullable = true)

+--------------+---+------+---------+-----+
|name          |age|gender|birthday |zip  |
+--------------+---+------+---------+-----+
|Carolyn Mathis|63 |Male  |3/7/1975 |49241|
|Anthony Lamb  |30 |Female|6/25/1987|37320|
|Eliza Bryan   |20 |Male  |9/13/1985|12568|
+--------------+---+------+---------+-----+
only showing top 3 rows



In [10]:
df_top5k_customers = (
    df_txns
    .groupBy("cust_id")
    .agg(F.countDistinct("txn_id").alias("distinct_txns"))
    .orderBy(F.desc("distinct_txns"))
    .limit(5000)
    .withColumn("row_id", F.row_number().over(Window.orderBy("cust_id")))
)

In [11]:
df_customer_det = df_customer_det.withColumn("row_id", F.row_number().over(Window.orderBy("name")))
df_customer_identity = df_top5k_customers.join(df_customer_det, "row_id").drop("row_id")

In [12]:
df_customer_identity.printSchema()
df_customer_identity.show(5, False)
df_customer_identity.select("cust_id").distinct().count()
df_customer_identity.count()

root
 |-- cust_id: string (nullable = true)
 |-- distinct_txns: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- zip: string (nullable = true)



                                                                                

+----------+-------------+-------------+---+------+----------+-----+
|cust_id   |distinct_txns|name         |age|gender|birthday  |zip  |
+----------+-------------+-------------+---+------+----------+-----+
|C007YEYTX9|7445         |Aaron Abbott |34 |Female|7/13/1991 |97823|
|C00B971T1J|7532         |Aaron Austin |37 |Female|12/16/2004|30332|
|C00WRSJF1Q|7777         |Aaron Barnes |29 |Female|3/11/1977 |23451|
|C01AZWQMF3|7548         |Aaron Barrett|31 |Male  |7/9/1998  |46613|
|C01BKUFRHA|7401         |Aaron Becker |54 |Male  |11/24/1979|40284|
+----------+-------------+-------------+---+------+----------+-----+
only showing top 5 rows



                                                                                

5000

                                                                                

5000

In [13]:
def assign_city():
    cities = [
        'san_francisco', 'new_york', 'chicago', 'philadelphia', 
        'boston', 'seattle', 'san_diego', 'los_angeles', 
        'denver', 'portland'
    ]
    return random.choice(cities)

assign_city_udf = udf(assign_city, StringType())

In [14]:
df_customer_identity = (
    df_customer_identity.withColumn(
        "city",
        assign_city_udf()
    )
)

In [15]:
df_customer_identity.show(10, False)

                                                                                

+----------+-------------+-------------+---+------+----------+-----+------------+
|cust_id   |distinct_txns|name         |age|gender|birthday  |zip  |city        |
+----------+-------------+-------------+---+------+----------+-----+------------+
|C007YEYTX9|7445         |Aaron Abbott |34 |Female|7/13/1991 |97823|denver      |
|C00B971T1J|7532         |Aaron Austin |37 |Female|12/16/2004|30332|portland    |
|C00WRSJF1Q|7777         |Aaron Barnes |29 |Female|3/11/1977 |23451|philadelphia|
|C01AZWQMF3|7548         |Aaron Barrett|31 |Male  |7/9/1998  |46613|philadelphia|
|C01BKUFRHA|7401         |Aaron Becker |54 |Male  |11/24/1979|40284|boston      |
|C01RGUNJV9|8280         |Aaron Bell   |24 |Female|8/16/1968 |86331|los_angeles |
|C01USDV4EE|7177         |Aaron Blair  |35 |Female|9/9/1974  |80078|philadelphia|
|C01WMZQ7PN|8617         |Aaron Brady  |51 |Female|8/20/1994 |52204|portland    |
|C021567NJZ|7260         |Aaron Briggs |57 |Male  |3/10/1990 |22008|new_york    |
|C023M6MKR3|7210

# Write Customer Data

In [16]:
(
    df_customer_identity
    .drop("distinct_txns")
    .write
    .mode("overwrite")
    .parquet("../../data/data_skew/customers.parquet")
)

                                                                                

# Write Skewed Transaction Data

In [17]:
# df_top10k_customers.filter(F.col("distinct_txns") >= 7000).distinct().count()

In [18]:
df_transactions = df_txns.join(
    df_top5k_customers,
    on="cust_id",
    how="inner"
).withColumn(
    "cust_id", 
    F.when(
        F.col("distinct_txns") >= 8000, F.lit("C0YDPQWPBJ")
    ).otherwise(F.col("cust_id"))
)

In [19]:
df_transactions = (
    df_transactions.withColumn(
        "city",
        assign_city_udf()
    )
)

In [20]:
df_transactions.groupBy("cust_id").count().orderBy(F.desc("count")).show(5, False)
df_transactions.cache()

                                                                                

+----------+--------+
|cust_id   |count   |
+----------+--------+
|C0YDPQWPBJ|17539732|
|CBW3FMEAU7|7999    |
|C3KUDEN3KO|7999    |
|C89FCEGPJP|7999    |
|CHNFNR89ZV|7998    |
+----------+--------+
only showing top 5 rows



DataFrame[cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, distinct_txns: bigint, row_id: int, city: string]

In [21]:
df_transactions.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- distinct_txns: long (nullable = false)
 |-- row_id: integer (nullable = true)
 |-- city: string (nullable = true)



In [22]:
df_transactions.groupBy("cust_id", "city").count().orderBy(F.desc("count")).show(20, False)

                                                                                

+----------+-------------+-------+
|cust_id   |city         |count  |
+----------+-------------+-------+
|C0YDPQWPBJ|portland     |1756379|
|C0YDPQWPBJ|los_angeles  |1755910|
|C0YDPQWPBJ|denver       |1755398|
|C0YDPQWPBJ|san_francisco|1754952|
|C0YDPQWPBJ|seattle      |1754184|
|C0YDPQWPBJ|chicago      |1753398|
|C0YDPQWPBJ|boston       |1752906|
|C0YDPQWPBJ|san_diego    |1752767|
|C0YDPQWPBJ|philadelphia |1752140|
|C0YDPQWPBJ|new_york     |1751698|
|C4XLI291DF|denver       |877    |
|COYZEFEC9N|portland     |876    |
|CLU2H1C3GZ|portland     |874    |
|CNT7TK3O4N|new_york     |873    |
|CJJ0NUQIUD|chicago      |873    |
|C41YIKBMNX|los_angeles  |870    |
|CDSOKODPKL|chicago      |867    |
|CMMS3KUZ6S|philadelphia |867    |
|C3S3XFH3L3|portland     |866    |
|CZVAF6O8HI|san_francisco|865    |
+----------+-------------+-------+
only showing top 20 rows



In [23]:
# Checks to validate if data is sane

# df_transactions.select("cust_id").distinct().count()
# df_transactions.select("txn_id").distinct().count()
# df_transactions.count()

In [24]:
(
    df_transactions
    .drop("distinct_txns", "row_id")
    .write
    .mode("overwrite")
    .parquet("../../data/data_skew/transactions.parquet")
)

                                                                                

In [25]:
# df_transactions_test = spark.read.parquet("../../data/data_skew/transactions.parquet")

In [26]:
# Checks to validate if data is sane

# (
#     df_transactions_test
#     .groupBy("cust_id")
#     .agg(F.countDistinct("txn_id").alias("ct"))
#     .orderBy(F.desc("ct"))
#     .show(20, False)
# )

In [27]:
spark.stop()