# Data Transformation and Testing
This notebook demonstrates the data transformation process using PySpark and validates the transformations using pytest.

In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

## Initialize Spark Session
Create a Spark session to work with the dataset.

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Transformation") \
    .getOrCreate()

## Load the Dataset
Load the dataset from the CSV file.

In [3]:
# Load the dataset
df = spark.read.csv("datasets/2021.csv", header=False)

## Define Schema
Define the schema for the dataset.

In [4]:
# Define schema
schema = ["OrderID", "OrderLine", "OrderDate", "CustomerName", "Email", "Product", "Quantity", "Price", "Tax"]

## Apply Schema
Apply the defined schema to the DataFrame.

In [5]:
# Apply schema to DataFrame
df = df.toDF(*schema)

## Split Product Column
Split the `Product` column into `ProductName` and `ProductDetails`.

In [6]:
# Split Product column into ProductName and ProductDetails
df = df.withColumn("ProductName", split(col("Product"), ",")[0]) \
       .withColumn("ProductDetails", split(col("Product"), ",")[1])

## Drop Original Product Column
Drop the original `Product` column from the DataFrame.

In [7]:
# Drop the original Product column
df = df.drop("Product")

## Show Transformed DataFrame
Display the transformed DataFrame.

In [8]:
# Show transformed DataFrame
df.show()

+--------+---------+----------+-------------+----------------------------+--------+---------+--------+----------------+---------------+
| OrderID|OrderLine| OrderDate| CustomerName|                       Email|Quantity|    Price|     Tax|      ProductName| ProductDetails|
+--------+---------+----------+-------------+----------------------------+--------+---------+--------+----------------+---------------+
| SO49171|        1|2021-01-01|Mariah Foster|mariah21@adventure-works.com|       1|2181.5625| 174.525|   Road-250 Black|             48|
| SO49172|        1|2021-01-01| Brian Howard| brian23@adventure-works.com|       1| 2443.35 |195.468 |     Road-250 Red|             44|
| SO49173|        1|2021-01-01| Linda Alvarez| linda19@adventure-works.com|       1|2071.4196|165.7136|Mountain-200 Silver|             38|
| SO49174|        1|2021-01-01|Gina Hernandez| gina4@adventure-works.com|       1|2071.4196|165.7136|Mountain-200 Silver|             42|
| SO49178|        1|2021-01-01|    Beth

## Save Transformed DataFrame
Save the transformed DataFrame to a new CSV file.

In [9]:
# Save the transformed DataFrame to a new CSV file
df.write.csv("datasets/transformed_2021.csv", header=True)

## Stop Spark Session
Stop the Spark session.

In [10]:
# Stop the Spark session
spark.stop()

# Data Transformation Testing
Use pytest to validate the data transformation.

In [11]:
# Import necessary libraries for testing
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

## Initialize Spark Session for Testing
Create a Spark session for testing.

In [12]:
# Initialize Spark session for testing
spark = SparkSession.builder \
    .appName("Data Transformation Test") \
    .getOrCreate()

## Load Transformed Dataset
Load the transformed dataset for testing.

In [13]:
# Load the transformed dataset
df = spark.read.csv(r"datasets/transformed_2021.csv", header=True)

## Test Schema
Validate the schema of the transformed dataset.

In [14]:
# Test schema
expected_schema = ["OrderID", "OrderLine", "OrderDate", "CustomerName", "Email", "Quantity", "Price", "Tax", "ProductName", "ProductDetails"]
assert df.columns == expected_schema

## Test Product Split
Validate the split of the `Product` column into `ProductName` and `ProductDetails`.

In [15]:
# Test product split
sample_row = df.filter(col("OrderID") == "SO49171").collect()[0]
assert sample_row["ProductName"] == "Road-250 Black"
assert sample_row["ProductDetails"].strip() == "48"

## Test Row Count
Validate the row count of the transformed dataset.

In [16]:
# Test row count
original_row_count = 28784  # Replace with the actual row count of the original dataset
assert df.count() == original_row_count

## Test Data Integrity
Validate the integrity of the data in the transformed dataset.

In [17]:
# Test data integrity
sample_row = df.filter(col("OrderID") == "SO49171").collect()[0]
assert sample_row["CustomerName"] == "Mariah Foster"
assert sample_row["Email"] == "mariah21@adventure-works.com"
assert sample_row["Quantity"] == "1"
assert sample_row["Price"] == "2181.5625"
assert sample_row["Tax"] == "174.525"

## Stop Spark Session for Testing
Stop the Spark session after testing.

In [18]:
# Stop the Spark session
spark.stop()