### Code Build using 
- Anaconda Navigator
- Jupyter Notebook
- Python and python libaries 

### Data Processing 
- Generate a csv file containing first_name, last_name, address, date_of_birth
- Process the csv file to anonymise the data
- Columns to anonymise are first_name, last_name and address
- You might be thinking that is silly
- Now make this work on 2GB csv file (should be doable on a laptop)
- Demonstrate that the same can work on bigger dataset
- Hint - You would need some distributed computing platform

In [None]:
# Cell to check and install necessary packages in Jupyter Notebook

import importlib
import sys

# List of packages to check and install
packages = [
    'pandas',
    'faker',
    'pyspark'
]

# Function to check and install packages
def install_packages(pkg_list):
    for pkg in pkg_list:
        try:
            importlib.import_module(pkg)
            print(f'{pkg} is already installed.')
        except ImportError:
            print(f'{pkg} not found. Installing...')
            # Install the package using pip
            !pip install {pkg}

# Run the function
install_packages(packages)

In [None]:
import pandas as pd
import subprocess
import os
import faker
import hashlib
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2, col
from concurrent.futures import ThreadPoolExecutor

### Step 1: Generate a CSV File

Creating a sample CSV file with the following columns:
- `first_name`
- `last_name`
- `address`
- `date_of_birth`

This CSV file will serve as the input for subsequent data processing steps. Ensurring that the file contains a sufficient amount of data for testing purposes.

In [None]:
# Initialize Faker to generate fake data
fake = faker.Faker()

# Number of rows to generate
num_rows = 10**6  # 1 million rows
chunk_size = 10**5  # Process in chunks of 100,000 rows

# Function to generate a chunk of data
def generate_chunk(start_index, end_index):
    data = {
        'first_name': [fake.first_name() for _ in range(start_index, end_index)],
        'last_name': [fake.last_name() for _ in range(start_index, end_index)],
        'address': [fake.address().replace('\n', ', ') for _ in range(start_index, end_index)],
        'date_of_birth': [fake.date_of_birth(minimum_age=18, maximum_age=90) for _ in range(start_index, end_index)],
    }
    return pd.DataFrame(data)

# Write to CSV in chunks
csv_file_path = 'large_dataset.csv'

# Open CSV file for writing
with open(csv_file_path, 'w', newline='', encoding='utf-8') as file:
    # Create header
    header_written = False

    # Generate and write data in chunks
    with ThreadPoolExecutor() as executor:
        futures = []
        for start in range(0, num_rows, chunk_size):
            end = min(start + chunk_size, num_rows)
            futures.append(executor.submit(generate_chunk, start, end))

        for future in futures:
            df_chunk = future.result()
            df_chunk.to_csv(file, mode='a', header=not header_written, index=False)
            header_written = True

print(f"Data generation complete. CSV file saved to {csv_file_path}.")

### Step 2: Load and Preview the CSV File
Load the generated CSV file and preview the data to ensure it has been generated correctly.

In [None]:
# Load the CSV file
df = pd.read_csv(csv_file_path)

# Preview the first few rows
df.head()

### Step 3: Anonymize Data
Anonymize the first_name, last_name, and address columns using hashing.

In [None]:
def anonymize_data(value):
    return hashlib.sha256(value.encode()).hexdigest()

# Apply anonymization
df['first_name'] = df['first_name'].apply(anonymize_data)
df['last_name'] = df['last_name'].apply(anonymize_data)
df['address'] = df['address'].apply(anonymize_data)

# Preview the anonymized data
df.head()

### Step 4: Save Anonymized Data
Save the anonymized DataFrame to a new CSV file.

In [None]:
# Path for the anonymized CSV file
anonymized_csv_file_path = 'anonymized_large_dataset.csv'

try:
    df.to_csv(anonymized_csv_file_path, index=False)
    print(f"Anonymized data successfully saved to {anonymized_csv_file_path}.")
except Exception as e:
    # Handle any exceptions that occur during the file writing process
    print(f"Failed to save anonymized data to CSV file: {e}")

### Step 5: Handling Large Datasets
For large file or 2GB file, can use chunking.

In [None]:
chunk_size = 10**5  # Process 100,000 rows at a time, can change as required. 

with pd.read_csv(csv_file_path, chunksize=chunk_size) as reader:
    for i, chunk in enumerate(reader):
        # Anonymize the chunk
        chunk['first_name'] = chunk['first_name'].apply(anonymize_data)
        chunk['last_name'] = chunk['last_name'].apply(anonymize_data)
        chunk['address'] = chunk['address'].apply(anonymize_data)
        
        # Save the chunk to a new file
        if i == 0:
            chunk.to_csv(anonymized_csv_file_path, mode='w', index=False)
        else:
            chunk.to_csv(anonymized_csv_file_path, mode='a', header=False, index=False)

### Step 6: Scaling with Distributed Computing
For larger datasets, this process can be scaled using a distributed computing platform like Apache Spark. We can define inpute, process and output, general ETL process or medallion architecture, however, here I have used same large_dataset.csv generated in above steps and process it to anonymize or hashed. 

In [None]:
# Define file paths
csv_file_path = 'large_dataset.csv'
hashed_csv_file_path = 'hased_large_dataset.csv'

try:
    # Initialize Spark session
     spark = SparkSession.builder.appName("HashedData").getOrCreate()
  
    # Load the CSV file into a Spark DataFrame
    df_spark = spark.read.csv(csv_file_path, header=True, inferSchema=True)
    

    # Anonymize the columns
    df_spark = df_spark.withColumn("first_name", sha2(col("first_name"), 256))
    df_spark = df_spark.withColumn("last_name", sha2(col("last_name"), 256))
    df_spark = df_spark.withColumn("address", sha2(col("address"), 256))

    # Save the anonymized DataFrame back to a CSV
    df_spark.write.mode("overwrite").csv(hashed_csv_file_path, header=True)

    print(f"Hashed data successfully saved to {hashed_csv_file_path}.")

except Exception as e:
    # Handle any exceptions that occur during Spark operations
    print(f"Failed to process and save hashed data: {e}")

finally:
    # Ensure Spark session is stopped
    try:
        spark.stop()
        print("Spark session stopped.")
    except:
        print("Failed to stop the Spark session.")