#Making a 50 Million Rows Dataset

In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm

# Parameters
TOTAL_RECORDS = 50_000_000
CHUNK_SIZE = 1_000_000  # Adjust based on your system's memory
OUTPUT_FILE = 'Bank_data.csv'

# Define possible values
genders = ['Male', 'Female']
purchased_options = ['Y', 'N']

# Comprehensive list of country names (195 countries as per United Nations)
countries = [
    'Afghanistan', 'Albania', 'Algeria', 'Andorra', 'Angola', 'Antigua and Barbuda',
    'Argentina', 'Armenia', 'Australia', 'Austria', 'Azerbaijan', 'Bahamas', 'Bahrain',
    'Bangladesh', 'Barbados', 'Belarus', 'Belgium', 'Belize', 'Benin', 'Bhutan',
    'Bolivia', 'Bosnia and Herzegovina', 'Botswana', 'Brazil', 'Brunei', 'Bulgaria',
    'Burkina Faso', 'Burundi', 'Côte d\'Ivoire', 'Cabo Verde', 'Cambodia', 'Cameroon',
    'Canada', 'Central African Republic', 'Chad', 'Chile', 'China', 'Colombia',
    'Comoros', 'Congo (Congo-Brazzaville)', 'Costa Rica', 'Croatia', 'Cuba', 'Cyprus',
    'Czechia (Czech Republic)', 'Democratic Republic of the Congo', 'Denmark', 'Djibouti',
    'Dominica', 'Dominican Republic', 'Ecuador', 'Egypt', 'El Salvador',
    'Equatorial Guinea', 'Eritrea', 'Estonia', 'Eswatini (fmr. "Swaziland")',
    'Ethiopia', 'Fiji', 'Finland', 'France', 'Gabon', 'Gambia', 'Georgia', 'Germany',
    'Ghana', 'Greece', 'Grenada', 'Guatemala', 'Guinea', 'Guinea-Bissau', 'Guyana',
    'Haiti', 'Holy See', 'Honduras', 'Hungary', 'Iceland', 'India', 'Indonesia',
    'Iran', 'Iraq', 'Ireland', 'Israel', 'Italy', 'Jamaica', 'Japan', 'Jordan',
    'Kazakhstan', 'Kenya', 'Kiribati', 'Kuwait', 'Kyrgyzstan', 'Laos',
    'Latvia', 'Lebanon', 'Lesotho', 'Liberia', 'Libya', 'Liechtenstein', 'Lithuania',
    'Luxembourg', 'Madagascar', 'Malawi', 'Malaysia', 'Maldives', 'Mali', 'Malta',
    'Marshall Islands', 'Mauritania', 'Mauritius', 'Mexico', 'Micronesia',
    'Moldova', 'Monaco', 'Mongolia', 'Montenegro', 'Morocco', 'Mozambique',
    'Myanmar (formerly Burma)', 'Namibia', 'Nauru', 'Nepal', 'Netherlands',
    'New Zealand', 'Nicaragua', 'Niger', 'Nigeria', 'North Korea', 'North Macedonia',
    'Norway', 'Oman', 'Pakistan', 'Palau', 'Palestine State', 'Panama',
    'Papua New Guinea', 'Paraguay', 'Peru', 'Philippines', 'Poland', 'Portugal',
    'Qatar', 'Romania', 'Russia', 'Rwanda', 'Saint Kitts and Nevis', 'Saint Lucia',
    'Saint Vincent and the Grenadines', 'Samoa', 'San Marino', 'Sao Tome and Principe',
    'Saudi Arabia', 'Senegal', 'Serbia', 'Seychelles', 'Sierra Leone', 'Singapore',
    'Slovakia', 'Slovenia', 'Solomon Islands', 'Somalia', 'South Africa',
    'South Korea', 'South Sudan', 'Spain', 'Sri Lanka', 'Sudan', 'Suriname',
    'Sweden', 'Switzerland', 'Syria', 'Taiwan', 'Tajikistan', 'Tanzania', 'Thailand',
    'Timor-Leste', 'Togo', 'Tonga', 'Trinidad and Tobago', 'Tunisia', 'Turkey',
    'Turkmenistan', 'Tuvalu', 'Uganda', 'Ukraine', 'United Arab Emirates',
    'United Kingdom', 'United States of America', 'Uruguay', 'Uzbekistan',
    'Vanuatu', 'Venezuela', 'Vietnam', 'Yemen', 'Zambia', 'Zimbabwe'
]

# Function to generate a data chunk
def generate_chunk(chunk_size):
    # Age: 19 to 88 with 15% nulls
    age = np.random.randint(19, 89, size=chunk_size).astype(float)
    age_null_mask = np.random.rand(chunk_size) < 0.15
    age[age_null_mask] = np.nan

    # Salary: 25000 to 98000 with 10% nulls
    salary = np.random.randint(25000, 98001, size=chunk_size).astype(float)
    salary_null_mask = np.random.rand(chunk_size) < 0.10
    salary[salary_null_mask] = np.nan

    # Gender: Male or Female
    gender = np.random.choice(genders, size=chunk_size)

    # Country: Random from list, with some 'unknown'
    country_null_mask = np.random.rand(chunk_size) < 0.05  # 5% set to 'unknown'
    country = np.random.choice(countries, size=chunk_size)
    country[country_null_mask] = 'unknown'

    # Purchased: Y or N
    purchased = np.random.choice(purchased_options, size=chunk_size)

    # Create DataFrame
    df = pd.DataFrame({
        'Age': age,
        'Salary': salary,
        'Gender': gender,
        'Country': country,
        'Purchased': purchased
    })

    return df

# Initialize CSV file with headers
header = True
with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
    pass  # Just to ensure the file is created/overwritten

# Calculate the number of full chunks and the remainder
num_full_chunks = TOTAL_RECORDS // CHUNK_SIZE
remainder = TOTAL_RECORDS % CHUNK_SIZE

print(f"Generating {TOTAL_RECORDS} records in {num_full_chunks + (1 if remainder else 0)} chunks...")

# Generate and write data in chunks
for i in tqdm(range(num_full_chunks + (1 if remainder else 0)), desc="Generating data"):
    current_chunk_size = CHUNK_SIZE if i < num_full_chunks else remainder
    if current_chunk_size == 0:
        break  # No more data to generate

    df_chunk = generate_chunk(current_chunk_size)

    # Write to CSV
    df_chunk.to_csv(OUTPUT_FILE, mode='a', index=False, header=header)
    header = False  # Only write header for the first chunk

print(f"Data generation complete. Saved to {OUTPUT_FILE}")


Generating 50000000 records in 50 chunks...


Generating data: 100%|██████████| 50/50 [02:58<00:00,  3.58s/it]

Data generation complete. Saved to Bank_data.csv





In [2]:
import pandas as pd
import numpy as np
df = pd.read_csv('Bank_data.csv')

In [3]:
df.sample(40)

Unnamed: 0,Age,Salary,Gender,Country,Purchased
47714103,27.0,36486.0,Female,Palau,N
28448813,47.0,79852.0,Female,Iraq,Y
29944160,86.0,32349.0,Female,Russia,N
42058072,,57672.0,Male,Slovakia,N
7506258,,35329.0,Female,Nigeria,N
17277286,83.0,81335.0,Female,Burkina Faso,N
25395186,65.0,86772.0,Female,Barbados,N
24874637,33.0,27636.0,Female,Kuwait,N
33481555,51.0,45403.0,Male,Jordan,Y
31472501,57.0,91152.0,Female,Tajikistan,N


## Install JDK
## Install Spark
## Set Environment variables
## Create a Spark Session

In [8]:
# Install Java 11
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download latest Spark
!wget https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz

# Extract Spark
!tar -xvf spark-3.5.4-bin-hadoop3.tgz  # Changed from 3.5.0 to 3.5.4

# Install findspark
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"  # Changed from 3.5.0 to 3.5.4

# Initialize Spark
import findspark
findspark.init()

# Create Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("SparkSession")\
    .getOrCreate()

--2025-01-06 18:35:19--  https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400879762 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.4-bin-hadoop3.tgz.1’


2025-01-06 18:35:22 (133 MB/s) - ‘spark-3.5.4-bin-hadoop3.tgz.1’ saved [400879762/400879762]

spark-3.5.4-bin-hadoop3/
spark-3.5.4-bin-hadoop3/R/
spark-3.5.4-bin-hadoop3/R/lib/
spark-3.5.4-bin-hadoop3/R/lib/SparkR/
spark-3.5.4-bin-hadoop3/R/lib/SparkR/INDEX
spark-3.5.4-bin-hadoop3/R/lib/SparkR/R/
spark-3.5.4-bin-hadoop3/R/lib/SparkR/R/SparkR
spark-3.5.4-bin-hadoop3/R/lib/SparkR/R/SparkR.rdb
spark-3.5.4-bin-hadoop3/R/lib/SparkR/R/SparkR.rdx
spark-3.5.4-bin-hadoop3/R/lib/SparkR/NAMESPACE
spark-3.5.4-bin-hadoop3/R/lib/SparkR/profile/
spark-3.5.4-bin-hadoop3/R/lib/SparkR/profile/shell.R
spark-3.5.4-bi

## Checking  if Dataframe getting created

In [42]:
spark.createDataFrame([{"Google": "Colab","Spark": "Scala"} ,{"Google": "Dataproc","Spark":"Python"}]).show()

+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



Importing the created Bank_data

In [43]:
bankProspectsDF = spark.read.csv("Bank_data.csv",header=True)


In [44]:
bankProspectsDF.show()

+----+-------+------+--------------------+---------+
| Age| Salary|Gender|             Country|Purchased|
+----+-------+------+--------------------+---------+
|76.0|69408.0|Female|             Georgia|        Y|
|75.0|45056.0|  Male|            Slovakia|        N|
|36.0|93969.0|Female|              Mexico|        N|
|31.0|75580.0|Female|              Brunei|        Y|
|NULL|91638.0|  Male|        Turkmenistan|        Y|
|85.0|86399.0|Female|    Marshall Islands|        Y|
|42.0|43406.0|Female|             Eritrea|        N|
|62.0|55741.0|Female|               Nauru|        Y|
|58.0|60165.0|Female|"Eswatini (fmr. "...|        Y|
|58.0|77509.0|  Male|             Uruguay|        N|
|NULL|43428.0|  Male|Democratic Republ...|        Y|
|66.0|55707.0|Female|              Kuwait|        N|
|NULL|26664.0|Female|Bosnia and Herzeg...|        N|
|77.0|   NULL|  Male|              Angola|        Y|
|NULL|   NULL|  Male|                Iran|        Y|
|79.0|34691.0|Female|             Lebanon|    

## Removing the records with unknow value in country column

In [13]:
import time
currect = time.time()
bankProspectsDF1 = bankProspectsDF.filter(bankProspectsDF['country'] != "unknown")
print(time.time() - currect)

0.020863056182861328


# Casting string to Integer/Float

In [16]:
bankProspectsDF1.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



In [17]:
from pyspark.sql.types import IntegerType,FloatType

In [18]:
bankProspectsDF2 = bankProspectsDF1.withColumn("age", bankProspectsDF1["age"].cast(IntegerType())).withColumn("salary", bankProspectsDF1["salary"].cast(FloatType()))


In [19]:
bankProspectsDF2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- salary: float (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



## Replacing Age and Salary with average values of their respective column

In [20]:
from pyspark.sql.functions import mean

### Calculate mean Age value

In [21]:
mean_age_val = bankProspectsDF2.select(mean(bankProspectsDF2['age'])).collect()

In [22]:
mean_age_val

[Row(avg(age)=53.497455114536194)]

In [23]:
mean_age = mean_age_val[0][0]

In [24]:
mean_age


53.497455114536194

### Calculate mean salary value

In [25]:
mean_salary_val = bankProspectsDF2.select(mean(bankProspectsDF2['salary'])).collect()

In [26]:
mean_salary = mean_salary_val[0][0]

In [28]:
print("Mean Salary : ",mean_salary)

Mean Salary :  61500.69378660621


In [29]:
bankProspectsDF2.show()

+----+-------+------+--------------------+---------+
| age| salary|Gender|             Country|Purchased|
+----+-------+------+--------------------+---------+
|  76|69408.0|Female|             Georgia|        Y|
|  75|45056.0|  Male|            Slovakia|        N|
|  36|93969.0|Female|              Mexico|        N|
|  31|75580.0|Female|              Brunei|        Y|
|NULL|91638.0|  Male|        Turkmenistan|        Y|
|  85|86399.0|Female|    Marshall Islands|        Y|
|  42|43406.0|Female|             Eritrea|        N|
|  62|55741.0|Female|               Nauru|        Y|
|  58|60165.0|Female|"Eswatini (fmr. "...|        Y|
|  58|77509.0|  Male|             Uruguay|        N|
|NULL|43428.0|  Male|Democratic Republ...|        Y|
|  66|55707.0|Female|              Kuwait|        N|
|NULL|26664.0|Female|Bosnia and Herzeg...|        N|
|  77|   NULL|  Male|              Angola|        Y|
|NULL|   NULL|  Male|                Iran|        Y|
|  79|34691.0|Female|             Lebanon|    

In [38]:
current= time.time()
bankbankProspectsDF3 = bankProspectsDF2.na.fill(mean_age,["age"])
print(time.time() - current)

0.03772854804992676


### Replace missing age with salary value

In [33]:
bankbankProspectsDF4 = bankbankProspectsDF3.na.fill(mean_salary,["salary"])

In [34]:
bankbankProspectsDF4.show()

+---+---------+------+--------------------+---------+
|age|   salary|Gender|             Country|Purchased|
+---+---------+------+--------------------+---------+
| 76|  69408.0|Female|             Georgia|        Y|
| 75|  45056.0|  Male|            Slovakia|        N|
| 36|  93969.0|Female|              Mexico|        N|
| 31|  75580.0|Female|              Brunei|        Y|
| 53|  91638.0|  Male|        Turkmenistan|        Y|
| 85|  86399.0|Female|    Marshall Islands|        Y|
| 42|  43406.0|Female|             Eritrea|        N|
| 62|  55741.0|Female|               Nauru|        Y|
| 58|  60165.0|Female|"Eswatini (fmr. "...|        Y|
| 58|  77509.0|  Male|             Uruguay|        N|
| 53|  43428.0|  Male|Democratic Republ...|        Y|
| 66|  55707.0|Female|              Kuwait|        N|
| 53|  26664.0|Female|Bosnia and Herzeg...|        N|
| 77|61500.695|  Male|              Angola|        Y|
| 53|61500.695|  Male|                Iran|        Y|
| 79|  34691.0|Female|      

In [39]:
bankbankProspectsDF4.printSchema()

root
 |-- age: integer (nullable = true)
 |-- salary: float (nullable = false)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



## Write the transformed file to a new csv file

In [41]:
bankbankProspectsDF4.write.format("csv").save("bank_prospects_transformed")

In [45]:
! -ls

/bin/bash: line 1: -ls: command not found


In [46]:
!ls bank_prospects_transformed/

part-00000-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00001-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00002-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00003-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00004-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00005-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00006-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00007-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00008-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00009-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00010-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
part-00011-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv
_SUCCESS


In [None]:
!cat bank_prospects_transformed/part-00000-8d384ecb-f264-4f8a-a6ab-6913dc06dfa4-c000.csv

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
68,48806.0,Male,New Zealand,N
56,69322.0,Female,Norway,Y
42,48260.0,Female,Jordan,Y
57,53433.0,Female,Guatemala,N
47,27186.0,Female,India,Y
44,67503.0,Female,Tuvalu,Y
53,35635.0,Male,Central African Republic,Y
42,75506.0,Female,Botswana,N
63,72869.0,Male,Antigua and Barbuda,Y
37,52868.0,Male,United Arab Emirates,N
53,86221.0,Female,Cyprus,Y
33,74969.0,Male,Armenia,N
87,68490.0,Male,India,Y
74,61500.695,Male,Marshall Islands,N
74,26802.0,Male,Holy See,Y
57,61500.695,Male,Liberia,N
53,35345.0,Male,Paraguay,Y
42,38787.0,Male,Qatar,N
53,80887.0,Male,Costa Rica,Y
50,60247.0,Female,Montenegro,Y
81,74279.0,Male,North Korea,Y
53,31934.0,Male,Qatar,Y
84,53983.0,Male,Vanuatu,Y
48,31414.0,Male,Chad,N
37,81008.0,Female,Moldova,Y
24,56145.0,Female,Poland,Y
49,75578.0,Male,Guinea,Y
65,31943.0,Female,Ukraine,N
53,61500.695,Male,Togo,N
25,95327.0,Male,Kenya,N
35,52887.0,Male,Belarus,N
53,95172.0,Male,Pakistan,Y
82,67741.0,Female,Lebanon,