# Data Engineering Interview Exercise Notebook
 
---
 
## 📖 Scenario
 
You are a Data Engineer at an international e-commerce company. Your task is to build a data pipeline that processes daily transaction data, enriches it with customer information, and produces an aggregated report to identify top-spending customers.
 
You have been provided with two CSV files:
- `transactions.csv`: Transaction details (transaction_id, customer_id, product_id, quantity, price, date).
- `customers.csv`: Customer details (customer_id, name, email, join_date).
 
---

## 🛠️ Step-by-Step Instructions
 
Complete each step below. You may choose either Pandas or PySpark for this exercise.
 
### ⚙️ Step 1: Data Ingestion
- Load both CSV files into DataFrames.
 
**Hint**: 
- Pandas: `pd.read_csv()`
- PySpark: `spark.read.csv()`

In [4]:
# Import for Pandas users
import pandas as pd

In [1]:
# Imports for PySpark users
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("EcommerceETL")

In [11]:
# Pandas
transactions = pd.read_csv('data/transactions.csv')
customers = pd.read_csv('data/customers.csv')
transactions.head()


Unnamed: 0,transaction_id,customer_id,product_id,quantity,price,date
0,1001,2001,3001,2,19.99,2024-01-01
1,1002,2002,3003,1,49.99,2024-01-01
2,1003,2001,3002,3,9.99,2024-01-02
3,1004,2003,3001,1,19.99,2024-01-02
4,1005,2004,3004,2,29.99,2024-01-03


In [12]:
customers.head()

Unnamed: 0,customer_id,name,email,join_date
0,2001,John Doe,johndoe@example.com,2023-06-15
1,2002,Jane Smith,janesmith@example.com,2023-07-20
2,2002,Jane Smith,janesmith@example.com,2023-07-20
3,2003,Bob Johnson,bobjohnson@example.com,2023-08-10
4,2004,Alice Williams,alicewilliams@example.com,2023-09-05


In [None]:
# PySpark
transactions_spark = spark.read.csv('data/transactions.csv', header=True, inferSchema=True)
customers_spark = spark.read.csv('data/customers.csv', header=True, inferSchema=True)

### 🧹 Step 2: Data Cleaning
- Remove duplicates from both datasets.
 
**Hint**: 
- Pandas: `drop_duplicates()`
- PySpark: `dropDuplicates()`

In [13]:
# Pandas
transactions = transactions.drop_duplicates(subset='transaction_id')
customers = customers.drop_duplicates(subset='customer_id')
transactions.head()

Unnamed: 0,transaction_id,customer_id,product_id,quantity,price,date
0,1001,2001,3001,2,19.99,2024-01-01
1,1002,2002,3003,1,49.99,2024-01-01
2,1003,2001,3002,3,9.99,2024-01-02
3,1004,2003,3001,1,19.99,2024-01-02
4,1005,2004,3004,2,29.99,2024-01-03


In [14]:
customers.head()

Unnamed: 0,customer_id,name,email,join_date
0,2001,John Doe,johndoe@example.com,2023-06-15
1,2002,Jane Smith,janesmith@example.com,2023-07-20
3,2003,Bob Johnson,bobjohnson@example.com,2023-08-10
4,2004,Alice Williams,alicewilliams@example.com,2023-09-05
5,2005,Michael Brown,michaelbrown@example.com,2023-10-12


In [None]:
# PySpark
transactions_spark = transactions_spark.dropDuplicates(['transaction_id'])
customers_spark = customers_spark.dropDuplicates(['customer_id'])

### 🔗 Step 3: Data Joining
- Join transaction data with customer data on `customer_id`.
 
**Hint**:
- Pandas: `merge()`
- PySpark: `.join()`

In [16]:
# Pandas
joined_df = transactions.merge(customers, on='customer_id', how='inner')
joined_df.head()

Unnamed: 0,transaction_id,customer_id,product_id,quantity,price,date,name,email,join_date
0,1001,2001,3001,2,19.99,2024-01-01,John Doe,johndoe@example.com,2023-06-15
1,1002,2002,3003,1,49.99,2024-01-01,Jane Smith,janesmith@example.com,2023-07-20
2,1003,2001,3002,3,9.99,2024-01-02,John Doe,johndoe@example.com,2023-06-15
3,1004,2003,3001,1,19.99,2024-01-02,Bob Johnson,bobjohnson@example.com,2023-08-10
4,1005,2004,3004,2,29.99,2024-01-03,Alice Williams,alicewilliams@example.com,2023-09-05


In [None]:
# PySpark
joined_spark_df = transactions_spark.join(customers_spark, 'customer_id', 'inner')

In [None]:
# Spark SQL
transactions_spark.createOrReplaceTempView("transactions")
customers_spark.createOrReplaceTempView("customers")
 
joined_sql_df = spark.sql("""
SELECT t.*, c.name, c.email, c.join_date
FROM transactions t
INNER JOIN customers c
ON t.customer_id = c.customer_id
""")

### 📊 Step 4: Data Aggregation
- Calculate the total amount spent (`quantity * price`) per customer.
- Find the top 5 customers based on total spending.
 
**Hint**:
- Pandas: Use `groupby()` and aggregate with `.sum()`
- PySpark: Use `groupBy()` and aggregation functions (`sum()`)

In [17]:
# Pandas
joined_df['total_spent'] = joined_df['quantity'] * joined_df['price']
top_customers = joined_df.groupby(['customer_id', 'name'])['total_spent'].sum().reset_index().sort_values(by='total_spent', ascending=False).head(5)
top_customers.head()

Unnamed: 0,customer_id,name,total_spent
4,2005,Michael Brown,249.95
2,2003,Bob Johnson,169.96
3,2004,Alice Williams,99.96
0,2001,John Doe,99.94
1,2002,Jane Smith,59.98


In [None]:
# PySpark
joined_spark_df = joined_spark_df.withColumn('total_spent', col('quantity') * col('price'))
top_customers_spark = (joined_spark_df.groupBy('customer_id', 'name')
                       .agg(sum('total_spent').alias('total_spent'))
                       .orderBy(col('total_spent').desc())
                       .limit(5))

In [None]:
# Spark SQL
joined_sql_df.createOrReplaceTempView("joined_data")
top_customers_sql = spark.sql("""
SELECT customer_id, name, SUM(quantity * price) AS total_spent
FROM joined_data
GROUP BY customer_id, name
ORDER BY total_spent DESC
LIMIT 5
""")

### 📁 Step 5: Export Results
- Export the aggregated results as a CSV file named `top_customers.csv`.
 
**Hint**:
- Pandas: `to_csv()`
- PySpark: `write.csv()`

In [7]:
# Pandas
top_customers.to_csv('top_customers.csv', index=False)

In [None]:
# PySpark
top_customers_spark.write.csv('top_customers.csv', header=True)