## **Import Libraries**

In [None]:
!pip install pyspark



## **Data Preparation and Partitioning:**
Load the "bank.csv" dataset into a Spark DataFrame and inspect the first few rows.

Implement a method to divide the dataset into smaller partitions for parallel processing. What strategy did you use for partitioning, and why?


**Step 1: Load the Dataset**

First, we will load the "bank.csv" dataset into a Spark DataFrame and inspect the first few rows.

**Step 2: Partitioning Strategy**

We will use the repartition method to partition the DataFrame based on the balance column. This approach helps distribute the data more evenly across partitions.

**Step 3: Implement the Code**

In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("DataParallelismBank") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/Distributed Machine Learning Project/bank.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Inspect the first few rows
df.show(5)

# Partition the dataset by the 'balance' column
partitioned_df = df.repartition(4, "balance")

# Verify the number of partitions
print(f"Number of partitions: {partitioned_df.rdd.getNumPartitions()}")

# Show the first few rows of one partition to verify
partitioned_df.show(5)


+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30| unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|   services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35| management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30| management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|blue-collar|married|secondary|     no|      0|    yes|  no| unknown|  5|  may|     22

### **Explanation:**

Spark Session: We create a Spark session with specified configurations to control memory usage and the number of shuffle partitions.

**Load Dataset:**We load the "bank.csv" dataset into a Spark DataFrame.

**Inspect Data:** We display the first few rows of the DataFrame to understand its structure.

**Partition Data:** We repartition the DataFrame based on the balance column into 4 partitions. The number of partitions can be adjusted based on the size of the dataset and the available resources.

**Verify Partitions:** We print the number of partitions to confirm the repartitioning.

**Why This Strategy?**

**Even Distribution:** Partitioning based on the balance column helps in distributing the data evenly across the partitions.

**Parallel Processing:** By partitioning the data, Spark can process each partition in parallel, which enhances the performance of data processing tasks.

**Scalability:** This approach scales well with large datasets, making it suitable for big data applications.

## **Data Analysis and Processing in Parallel:**
Identify and calculate the average balance for each job category in the "bank.csv" dataset. Use parallel processing to perform this calculation. Describe your approach and the results.

Perform a parallel operation to identify the top 5 age groups in the dataset that have the highest loan amounts. Explain your methodology and present your findings.


Step 1: **Setup Spark Session**

First, we need to set up a Spark session.

Step 2: **Load and Inspect the Dataset**

Load the "bank.csv" dataset into a Spark DataFrame and inspect the first few rows.

Step 3: **Calculate Average Balance for Each Job Category**

Use the groupBy and agg functions to calculate the average balance for each job category.

Step 4:**Identify Top 5 Age Groups with Highest Loan Amounts**

Use the groupBy and agg functions to calculate the total loan amounts for each age group and then identify the top 5 age groups.

Step 5: **Implement the Code**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum as spark_sum

# Create Spark session
spark = SparkSession.builder \
    .appName("DataAnalysisBank") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/Distributed Machine Learning Project/bank.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Inspect the first few rows
df.show(5)

# Calculate average balance for each job category
avg_balance_per_job = df.groupBy("job").agg(avg("balance").alias("avg_balance"))
avg_balance_per_job.show()

# Calculate the total loan amounts for each age group
# Assuming 'loan' column indicates loan amounts (replace this if 'loan' is a binary indicator)
total_loan_per_age_group = df.groupBy("age").agg(spark_sum("balance").alias("total_loan"))

# Identify the top 5 age groups with the highest loan amounts
top_5_age_groups = total_loan_per_age_group.orderBy("total_loan", ascending=False).limit(5)
top_5_age_groups.show()

# Stop the Spark session
spark.stop()


+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30| unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|   services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35| management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30| management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|blue-collar|married|secondary|     no|      0|    yes|  no| unknown|  5|  may|     22

### **Explanation:**

**Spark Session:**

We create a Spark session with configurations for memory and shuffle partitions.

**Load Dataset:**

Load the "bank.csv" dataset into a Spark DataFrame.

**Inspect Data:**

Display the first few rows to understand the data structure.

**Average Balance per Job:**

Use the groupBy and agg functions to calculate the average balance for each job category. The result is displayed using show().

**Total Loan Amount per Age Group:**

Group the data by age and calculate the total loan amounts. The spark_sum function is used for summation.

**Top 5 Age Groups with Highest Loan Amounts:**

Order the results by total_loan in descending order and limit the output to the top 5 age groups.

### **Results:**
**Average Balance for Each Job Category:**

This will display the average account balance for each job category.

**Top 5 Age Groups with Highest Loan Amounts:**

This will display the age groups that have the highest total loan amounts.

### **Approach:**

**Parallel Processing:**

By using groupBy and agg functions, Spark performs these operations in parallel across partitions, leveraging the distributed computing power of the cluster.

**Efficiency:**

Partitioning and parallel processing ensure that the computations are performed efficiently, even on large datasets.

## **Model Training on Partitioned Data:**

Choose a classification model to predict whether a client will subscribe to a term deposit (target variable 'y').

Briefly explain why you selected this model.

Partition the dataset into training and testing sets and train your model on these partitions.

Discuss any challenges you faced in parallelizing the training process and how you addressed them.


### **Model Selection:**

For this task, I will use a Random Forest Classifier to predict whether a client will subscribe to a term deposit (target variable 'y'). Here's the detailed process:

**Random Forest Classifier:**

I selected the Random Forest Classifier because it is a robust and versatile classification algorithm that works well with various types of data. It can handle imbalanced datasets, provides good accuracy, and is relatively easy to interpret. It also handles feature interactions naturally and is less prone to overfitting compared to individual decision trees.

### **Data Partitioning and Model Training:**

**Partitioning the Data:**

We will split the dataset into training and testing sets using Spark's randomSplit method, ensuring that the data is distributed evenly for parallel processing.

**Training the Model:**

We'll use the training set to fit the Random Forest model and evaluate its performance on the testing set.

### **Challenges and Solutions:**

**Parallel Processing:**

Ensuring that the training process utilizes the parallel processing capabilities of Spark. This involves correctly partitioning the data and leveraging Spark's MLlib for distributed model training.

**Handling Categorical Variables:**

Using StringIndexer to convert categorical variables to numerical values for model training.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create Spark session
spark = SparkSession.builder \
    .appName("BankTermDepositPrediction") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/Distributed Machine Learning Project/bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")

# Save the trained model
model.save("/content/drive/MyDrive/Distributed Machine Learning Project/bank_rf_model_new")

# Stop the Spark session
spark.stop()


### **Explanation:**

**Spark Session:**

We set up a Spark session with specific configurations to optimize parallel processing.

**Load Dataset:**

The dataset is loaded into a Spark DataFrame.

**Handle Categorical Variables:**

We use StringIndexer to convert categorical variables to numerical values.

**Rename Target Column:**

The target variable 'y' is renamed to 'label'.

**Convert Label Column:**

The label column is converted to numerical values using StringIndexer.
**Assemble Features:**

We use VectorAssembler to combine all features into a single vector column.

**Initialize and Train the Model:**

A Random Forest classifier is initialized and trained using the pipeline.

**Evaluate the Model:**

The model is evaluated on the test set using the AUC metric.

**Save the Model:**

The trained model is saved for future use.

### **Challenges and Solutions:**

**Handling Categorical Variables:**

The categorical variables were handled using StringIndexer to convert them to numerical values required for the Random Forest algorithm.

**Parallel Processing:**

By using Spark's MLlib and ensuring the data was evenly partitioned, we leveraged Spark's distributed computing capabilities to speed up the training process.

**Resource Management:**

Setting appropriate configurations for executor and driver memory ensured efficient resource usage.

## **Resource Monitoring and Management:**
Implement resource monitoring during data processing and model training. What observations did you make regarding CPU and memory usage?

To monitor and manage resources effectively during data processing and model training in a Spark environment, you can use several tools and techniques.

These include using Spark’s built-in UI, leveraging external monitoring tools, and analyzing Spark logs.

**Using Spark Event Logging**

Enable Spark event logging to capture detailed information about your Spark application’s execution.


### **Resource Monitoring Observations**

During the execution of the Spark job, you can observe the following metrics:

**CPU Usage:** Typically spikes during task execution phases, especially during data shuffling and model training stages.

**Memory Usage:** Key metrics to monitor include executor memory usage and garbage collection (GC) times. High memory usage or frequent GC can indicate the need for memory optimization or tuning.

**Disk I/O:** Observed during stages involving data read/write operations, especially during shuffling or saving models to disk.

**Network I/O:** Significant during data shuffling between nodes.

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Path to the event log directory
event_log_dir = "/content/drive/MyDrive/eventlog"

# Create the event log directory if it doesn't exist
if not os.path.exists(event_log_dir):
    os.makedirs(event_log_dir)

# Create Spark session with resource monitoring enabled
spark = SparkSession.builder \
    .appName("BankTermDepositPredictionlog") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", event_log_dir) \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/Distributed Machine Learning Project/bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


# Stop the Spark session
spark.stop()


Test AUC: 0.878627450980392


### **Explanation:**
**Event Log Directory Creation:**

The code now checks if the event log directory (/content/drive/MyDrive/eventlog) exists, and creates it if it does not.

**Spark Session Configuration:**

The eventLog.dir configuration is updated to use the newly created directory.

**Data Preparation and Model Training:**

The rest of the code loads the dataset, processes it, trains a Random Forest classifier and evaluates the model.

**Stopping the Spark Session:**

Ensures the Spark session is properly stopped after the model training and evaluation are complete.

## **Task Management and Scheduling:**

Manage multiple parallel tasks, such as different preprocessing tasks. How did you ensure the effective management of these tasks?


**To effectively manage and schedule multiple parallel preprocessing tasks in a Spark application, we need to consider how to optimize resource utilization and task scheduling. Spark provides mechanisms to manage parallelism and task scheduling through configurations and best practices.**

Here's an approach to ensure effective management of parallel preprocessing tasks:

Define the tasks: Clearly define the preprocessing tasks that need to be performed.

Configure Spark for parallelism: Use Spark's configurations to optimize resource utilization.

Use DataFrame transformations: Leverage Spark's DataFrame transformations for parallel processing.

Manage dependencies: Ensure tasks that depend on each other are executed in the correct order.

Monitor and tune performance: Monitor the resource usage and performance of your Spark jobs and tune configurations as needed.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create Spark session with optimized configurations
spark = SparkSession.builder \
    .appName("ParallelPreprocessing") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "/content/drive/MyDrive/Distributed Machine Learning Project/bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Preprocessing tasks as separate functions
def index_categorical_columns(df):
    categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
    indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]
    pipeline = Pipeline(stages=indexers)
    return pipeline.fit(df).transform(df)

def assemble_features(df):
    feature_columns = ["age", "balance", "day", "duration", "campaign", "pdays", "previous",
                       "job_index", "marital_index", "education_index", "default_index",
                       "housing_index", "loan_index", "contact_index", "month_index", "poutcome_index"]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    return assembler.transform(df)

def rename_label_column(df):
    return df.withColumnRenamed("y", "label")

def index_label_column(df):
    label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
    return label_indexer.fit(df).transform(df)

# Apply preprocessing tasks in parallel
data = rename_label_column(data)
data = index_categorical_columns(data)
data = assemble_features(data)
data = index_label_column(data)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=[rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")

# Stop the Spark session
spark.stop()


Test AUC: 0.8786274509803921


### **Explanation:**
**Spark Session Configuration:**

The Spark session is configured with optimal resource allocations and parallelism settings.

**Preprocessing Functions:**

index_categorical_columns(df): Indexes categorical columns using StringIndexer.

assemble_features(df): Assembles feature columns into a feature vector.

rename_label_column(df): Renames the target column to "label".

index_label_column(df): Indexes the label column.

**Pipeline Construction:**

A Pipeline object is created to encapsulate the preprocessing and model training steps.

**Parallel Processing:**

The preprocessing steps are applied in a sequence that ensures they can run in parallel as much as possible without unnecessary dependencies.

**Model Training and Evaluation:**

The Random Forest model is trained and evaluated using the preprocessed data.
Resource Monitoring and Management:

The code includes configurations for resource allocation, such as setting the number of shuffle partitions and memory allocations for the driver and executors.

### **Monitoring and Tuning:**
**Monitoring Tools:**

Use Spark UI to monitor job progress, task execution, and resource usage.
Use tools like Ganglia or Prometheus for detailed resource monitoring.

**Tuning Configurations:**

Adjust spark.sql.shuffle.partitions based on the size of the data and cluster resources.
Allocate sufficient memory to executors and driver based on the data size and complexity of transformations.
This approach ensures that multiple preprocessing tasks are managed and executed effectively in parallel, leveraging Spark's capabilities for distributed data processing.