In [2]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta
from pyspark.sql.functions import substring, lit
import random




In [3]:
# Creating  a Spark session
spark = SparkSession.builder.appName("GenerateData").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/01 13:43:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# assuming some sample data  to generate data based on it
first_names = ["Alice", "Bob", "Charlie", "David", "Emily", "Frank", "Grace", "Henry", "Isabella", "Jack"]
last_names = ["Smith", "Jones", "Williams", "Brown", "Davis", "Miller", "Wilson", "Moore", "Taylor", "Anderson"]
addresses = ["123 Main St", "456 Elm St", "789 Oak St", "1011 Pine St", "1213 Willow St"]

# Generating  random dates of birth
start_date = date(1970, 1, 1)
end_date = date(2022, 12, 31)
num_days = (end_date - start_date).days

# Generating sample data
data = []
for _ in range(100):
    first_name = random.choice(first_names)
    last_name = random.choice(last_names)
    address = random.choice(addresses)
    random_days = random.randrange(num_days)
    date_of_birth = start_date + timedelta(days=random_days)
    data.append((first_name, last_name, address, date_of_birth))

schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("date_of_birth", DateType(), True)
])


In [6]:
# create a datafrmae and write a csv file
df = spark.createDataFrame(data, schema)
df.write.csv("generated_data.csv", header=True, mode="overwrite")



24/09/01 13:43:51 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [7]:
df.show()

+----------+---------+--------------+-------------+
|first_name|last_name|       address|date_of_birth|
+----------+---------+--------------+-------------+
|   Charlie|   Wilson|  1011 Pine St|   2004-09-21|
|     Grace|   Taylor|1213 Willow St|   1996-08-20|
|     Henry|    Davis|1213 Willow St|   1996-02-07|
|     Grace|   Wilson|    456 Elm St|   1997-06-04|
|   Charlie|    Davis|    456 Elm St|   1990-05-09|
|   Charlie|   Miller|1213 Willow St|   1978-10-05|
|     Grace|   Wilson|    456 Elm St|   1995-01-14|
|  Isabella|    Jones|1213 Willow St|   1974-07-30|
|     Grace|    Jones|    456 Elm St|   1970-08-02|
|     Frank| Williams|    789 Oak St|   1981-05-16|
|     Henry|   Taylor|    456 Elm St|   2017-08-27|
|     Frank|    Smith|   123 Main St|   2007-10-16|
|     Grace|   Taylor|    789 Oak St|   1970-12-17|
|     Emily| Williams|1213 Willow St|   2019-11-13|
|     Grace|    Brown|    789 Oak St|   2000-04-21|
|      Jack| Anderson|  1011 Pine St|   1973-05-29|
|     David|

In [8]:
import string

df = spark.read.csv("generated_data.csv", header=True, inferSchema=True)

def generate_random_string(length=8):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def anonymize_first_name(name):
    return f"First_{generate_random_string()}"

def anonymize_last_name(name):
    return f"Last_{generate_random_string()}"

def anonymize_address(addr):
    return f"Address_{generate_random_string()}"

anonymize_first_name_udf = udf(anonymize_first_name, StringType())
anonymize_last_name_udf = udf(anonymize_last_name, StringType())
anonymize_address_udf = udf(anonymize_address, StringType())

anonymized_df = df.withColumn("first_name", anonymize_first_name_udf(col("first_name"))) \
                 .withColumn("last_name", anonymize_last_name_udf(col("last_name"))) \
                 .withColumn("address", anonymize_address_udf(col("address")))

anonymized_df.write.csv("anonymized_data.csv", header=True, mode="overwrite")




                                                                                

In [9]:
anonymized_df.show()

+--------------+-------------+----------------+-------------+
|    first_name|    last_name|         address|date_of_birth|
+--------------+-------------+----------------+-------------+
|First_3xrNuzqc|Last_dlinSst9|Address_aieTJz5c|   1998-10-02|
|First_bTM8gwFN|Last_KgRhDceQ|Address_p869COPM|   1972-08-16|
|First_FrOu1Nla|Last_MfusHojF|Address_DgWSUTO8|   1978-10-27|
|First_VHMTsdtE|Last_00ueD6jc|Address_YQrhjJEb|   1991-11-02|
|First_q06gch2T|Last_ohmFD3hm|Address_Pvp9ddA7|   2020-06-14|
|First_KTvQJjV0|Last_mNlbSJvK|Address_a12lmJMw|   1997-05-27|
|First_4ZFe9iFF|Last_1Dt74Ggg|Address_Q622kIGM|   2011-09-07|
|First_KfibNOix|Last_BxszeGC9|Address_G1HqDVTR|   2010-09-02|
|First_bToyQsAj|Last_2qPxBqid|Address_aXFZY3yM|   1977-09-17|
|First_2xCIZgM7|Last_WVHRH3Gu|Address_ZGO3hh4w|   1975-10-02|
|First_CrYHzQZB|Last_d8kV4It2|Address_imiT85NZ|   1981-02-01|
|First_aCm3mJho|Last_GcpSaXHR|Address_TbH8JOuZ|   1979-10-01|
|First_LjZb1Mob|Last_hpgD0rOV|Address_ftusVWKi|   2021-12-30|
|First_o

Trying on dataset downloaded from kaggle , and anonymize all the columns except the 'FinalGrade' column

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import random
import string


spark = SparkSession.builder.appName("kaggle_student_dataset").getOrCreate()

df = spark.read.csv("/Users/manikantaboddu/Downloads/student_performance.csv", header=True, inferSchema=True)

def generate_random_string(length=8):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))


def anonymize_column(value):
    return f"Anonymized_{generate_random_string()}"


anonymize_udf = udf(anonymize_column, StringType())

# Anonymize all the fields excluding 'FinalGrade'
columns_to_anonymize = [col_name for col_name in df.columns if col_name != 'FinalGrade']
anonymized_df = df
for col_name in columns_to_anonymize:
    anonymized_df = anonymized_df.withColumn(col_name, anonymize_udf(col(col_name)))


anonymized_df.write.csv("anonymized_Kaggle_student_data.csv", header=True, mode="overwrite")


24/09/01 13:58:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Insted of mention the column name in the list, used below code to anonymize all the columns except the specified columns

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import random
import string

spark = SparkSession.builder.appName("Anonymization").getOrCreate()

df = spark.read.csv("/Users/manikantaboddu/Downloads/student_performance.csv", header=True, inferSchema=True)

def generate_random_string(length=8):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def anonymize_column(value):
    return f"Anonymized_{generate_random_string()}"

anonymize_udf = udf(anonymize_column, StringType())

# excluding the 2nd and 3rd columns from anonymization
columns_to_exclude = [df.columns[1], df.columns[2]]


columns_to_anonymize = [col_name for col_name in df.columns if col_name not in columns_to_exclude]


anonymized_df = df
for col_name in columns_to_anonymize:
    anonymized_df = anonymized_df.withColumn(col_name, anonymize_udf(col(col_name)))


anonymized_df.write.csv("anonymized_student_data.csv", header=True, mode="overwrite")
