In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, when, lit, udf, sha2, regexp_replace
# Step 1: Initialize Spark Session
spark = SparkSession.builder.appName("DataPipeline").getOrCreate()

# Step 2: Load Raw Data
raw_data_path = "/content/Sample_Data_For_Data_Engineering_UseCase (1).csv"  # Update with actual path
raw_df = spark.read.csv(raw_data_path, header=True, inferSchema=True)

# Step 3: Data Validation
# Check for missing critical columns
required_columns = ["OrderID", "Quantity", "Price", "CreditCardNumber"]
missing_columns = [col for col in required_columns if col not in raw_df.columns]
if missing_columns:
    raise ValueError(f"Missing required columns: {missing_columns}")

# Step 4: Data Transformation
# Add Total_Sales column
transformed_df = raw_df.withColumn("Total_Sales", col("Quantity") * col("Price"))

# Handle missing values
transformed_df = transformed_df.fillna({
    "CustomerName": "Unknown",
    "PhoneNumber": "000-000-0000",
    "Location": "Unknown",
    "Country": "Unknown"
}).na.drop(subset=["OrderID", "Price", "Quantity"])

# Group by Location and calculate metrics
aggregated_df = (
    transformed_df.groupBy("Location")
    .agg(
        _sum("Total_Sales").alias("Total_Sales"),
        avg("Total_Sales").alias("Average_Order_Value"),
        _sum("Quantity").alias("Total_Quantity")
    )
)

# Step 5: Handle PII Data
# Anonymize CreditCardNumber
pii_anonymized_df = transformed_df.withColumn(
    "CreditCard_Anonymized", sha2(col("CreditCardNumber"), 256)  # SHA-256 hashing
).drop("CreditCardNumber")  # Drop original column for security

# Mask other sensitive data (e.g., phone numbers)
pii_anonymized_df = pii_anonymized_df.withColumn(
    "Masked_PhoneNumber", regexp_replace(col("PhoneNumber"), r".(?=.{4}$)", "*")
)

# Step 6: Save Outputs
# Save transformed data
transformed_output_path = "transformed_data"
transformed_df.write.mode("overwrite").parquet(transformed_output_path)

# Save aggregated data
# Changed output path to a local directory. Update with your desired path.
aggregated_output_path = "aggregated_data"
aggregated_df.write.mode("overwrite").parquet(aggregated_output_path)

# Save anonymized PII data
# Changed output path to a local directory. Update with your desired path.
pii_output_path = "anonymized_data"
pii_anonymized_df.write.mode("overwrite").parquet(pii_output_path)

print("Pipeline completed successfully.")

Pipeline completed successfully.


In [None]:
from pyspark.sql.functions import col, regexp_replace

# Mask credit card numbers
sales_df = sales_df.withColumn(
    "Masked_CreditCardNumber",
    regexp_replace(col("CreditCardNumber"), r"\d{12}", "****-****-****")
)


In [None]:
pip install pycryptodome


Collecting pycryptodome
  Downloading pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Downloading pycryptodome-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycryptodome
Successfully installed pycryptodome-3.21.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from Crypto.Cipher import AES
import base64

# Initialize Spark Session
spark = SparkSession.builder.appName("EncryptCreditCard").getOrCreate()

# Sample data (replace with your actual DataFrame)
data = [("1234567812345678",), ("8765432187654321",)]
columns = ["CreditCardNumber"]
sales_df = spark.createDataFrame(data, columns)

# Encryption setup
def create_cipher():
    key = b'sixteen byte key'  # Ensure 16-byte key
    return AES.new(key, AES.MODE_ECB)

# Encrypt function
def encrypt_credit_card(card_number):
    if card_number is None:
        return None
    padded_number = card_number.ljust(32)  # Pad to make it 32 bytes
    cipher = create_cipher()  # Create cipher locally in the function
    encrypted = cipher.encrypt(padded_number.encode())
    return base64.b64encode(encrypted).decode()

# Register UDF for encryption
encrypt_udf = udf(encrypt_credit_card, StringType())

# Apply encryption
sales_df = sales_df.withColumn(
    "Encrypted_CreditCardNumber",
    encrypt_udf(col("CreditCardNumber"))
)

# Show the results
sales_df.show(truncate=False)


+----------------+--------------------------------------------+
|CreditCardNumber|Encrypted_CreditCardNumber                  |
+----------------+--------------------------------------------+
|1234567812345678|6Q/X/fWXV/6SQO2dxUx1gJSmapPeeNYogG594rrVZL8=|
|8765432187654321|+TbhuQfthIflnWFN89PV1ZSmapPeeNYogG594rrVZL8=|
+----------------+--------------------------------------------+

