# Task 1: Efficient Data Handling through Data Parallelism with Pyspark

## **Import Libraries**

In [None]:
!pip install pyspark



In [None]:
#Import Libraries
from pyspark.sql import SparkSession

In [None]:
# Create Spark session
spark = SparkSession.builder \
    .appName("DataParallelismBank") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()


Load the "bank.csv" dataset into a Spark DataFrame

In [None]:
# Load the dataset
data_path = "bank.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)


## **Data Preparation and Partitioning:**


In [None]:
# Inspect the first few rows
df.show(5)

+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 30| unemployed|married|  primary|     no|   1787|     no|  no|cellular| 19|  oct|      79|       1|   -1|       0| unknown| no|
| 33|   services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 35| management| single| tertiary|     no|   1350|    yes|  no|cellular| 16|  apr|     185|       1|  330|       1| failure| no|
| 30| management|married| tertiary|     no|   1476|    yes| yes| unknown|  3|  jun|     199|       4|   -1|       0| unknown| no|
| 59|blue-collar|married|secondary|     no|      0|    yes|  no| unknown|  5|  may|     22

Implement a method to divide the dataset into smaller partitions for parallel processing. What strategy did you use for partitioning, and why?

In [None]:
# Partition the dataset by the 'balance' column
partitioned_df = df.repartition(4, "balance")

# Verify the number of partitions
print(f"Number of partitions: {partitioned_df.rdd.getNumPartitions()}")

# Show the first few rows of one partition to verify
partitioned_df.show(5)

Number of partitions: 4
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
|age|          job|marital|education|default|balance|housing|loan| contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+-------------+-------+---------+-------+-------+-------+----+--------+---+-----+--------+--------+-----+--------+--------+---+
| 33|     services|married|secondary|     no|   4789|    yes| yes|cellular| 11|  may|     220|       1|  339|       4| failure| no|
| 36|self-employed|married| tertiary|     no|    307|    yes|  no|cellular| 14|  may|     341|       1|  330|       2|   other| no|
| 41| entrepreneur|married| tertiary|     no|    221|    yes|  no| unknown| 14|  may|      57|       2|   -1|       0| unknown| no|
| 56|   technician|married|secondary|     no|   4073|     no|  no|cellular| 27|  aug|     239|       5|   -1|       0| unknown| no|
| 37|       admin.| single| tertiary|     no|   2317

I used the repartition method to partition the DataFrame based on the balance column. This approach helps distribute the data more evenly across partitions.

Reason of using this method:
1. Even Distribution: Partitioning based on the balance column helps in distributing the data evenly across the partitions.
2. Parallel Processing: By partitioning the data, Spark can process each partition in parallel, which enhances the performance of data processing tasks.
3. Scalability: This approach scales well with large datasets, making it suitable for big data applications.

Summary: 1. To manage memory utilization and the quantity of shuffle partitions, we first establish a Spark session with the desired settings.
2. A Spark DataFrame is loaded with the "bank.csv" dataset.
3. To understand the structure of the dataframe, we first showed a few rows of the dataframe.
4. The balance column is the basis for our four partitions within the dataframe. The dataset's size and the resources at hand can be used to modify the number of partitions.
5. We verify the partitioning by printing the total number of partitions.


## **Data Analysis and Processing in Parallel:**
Identify and calculate the average balance for each job category in the "bank.csv" dataset. Use parallel processing to perform this calculation. Describe your approach and the results.






 **Calculate Average Balance for Each Job Category**

Use the groupBy and agg functions to calculate the average balance for each job category.



In [None]:
from pyspark.sql.functions import avg, sum as spark_sum

# Calculate average balance for each job category
avg_balance_per_job = df.groupBy("job").agg(avg("balance").alias("avg_balance"))
avg_balance_per_job.show()




+-------------+------------------+
|          job|       avg_balance|
+-------------+------------------+
|   unemployed|       1089.421875|
|     services|1103.9568345323742|
|      student|1543.8214285714287|
|      unknown|1501.7105263157894|
|   management|1766.9287925696594|
|  blue-collar| 1085.161733615222|
|self-employed|1392.4098360655737|
|       admin.|  1226.73640167364|
|   technician|     1330.99609375|
|    housemaid|2083.8035714285716|
| entrepreneur|          1645.125|
|      retired| 2319.191304347826|
+-------------+------------------+



Perform a parallel operation to identify the top 5 age groups in the dataset that have the highest loan amounts. Explain your methodology and present your findings.


Identify Top 5 Age Groups with Highest Loan Amounts:

Use the groupBy and agg functions to calculate the total loan amounts for each age group and then identify the top 5 age groups.

In [None]:
# Calculate the total loan amounts for each age group
# Assuming 'loan' column indicates loan amounts (replace this if 'loan' is a binary indicator)
total_loan_per_age_group = df.groupBy("age").agg(spark_sum("balance").alias("total_loan"))

# Identify the top 5 age groups with the highest loan amounts
top_5_age_groups = total_loan_per_age_group.orderBy("total_loan", ascending=False).limit(5)
top_5_age_groups.show()

# Stop the Spark session
spark.stop()

+---+----------+
|age|total_loan|
+---+----------+
| 33|    287447|
| 32|    281467|
| 38|    273320|
| 34|    256765|
| 31|    256408|
+---+----------+




### **Results:**
**Average Balance for Each Job Category:**

This will display the average account balance for each job category.

**Top 5 Age Groups with Highest Loan Amounts:**

This will display the age groups that have the highest total loan amounts.

### **Approach:**

**Parallel Processing:**

By using groupBy and agg functions, Spark performs these operations in parallel across partitions, leveraging the distributed computing power of the cluster.

**Efficiency:**

Partitioning and parallel processing ensure that the computations are performed efficiently, even on large datasets.

## **Model Training on Partitioned Data:**

Choose a classification model to predict whether a client will subscribe to a term deposit (target variable 'y').

Briefly explain why you selected this model.

Partition the dataset into training and testing sets and train your model on these partitions.

Discuss any challenges you faced in parallelizing the training process and how you addressed them.


### **Model Selection:**

We will use a Random Forest Classifier to predict whether a client will subscribe to a term deposit (target variable 'y').

We used Random Forests classifier for this task because these are ensemble methods that combine multiple decision trees to improve predictive accuracy and control overfitting.

Here are the steps to implement a Random Forest classifier in PySpark:
### **Data Partitioning and Model Training:**

1.Load and Prepare Data: Ensure the data is ready for modeling.

2.Partition the Dataset: Split the data into training and testing sets.

3.Train the Random Forest Model: Fit the Random Forest model on the training set.

4.Evaluate the Model: Assess the model’s performance on the testing set.

### **Challenges and Solutions:**

1. Parallelization: PySpark inherently parallelizes the training of Random Forests, but ensuring efficient data partitioning and avoiding bottlenecks in resource allocation is key to speeding up the training process.


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create Spark session
spark = SparkSession.builder \
    .appName("BankTermDepositPrediction") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)
# data = df
# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


# Save the trained model, overwriting if the path already exists
model.write().overwrite().save("/content/drive/MyDrive/ml_model_trained")

# Stop the Spark session
spark.stop()


Test AUC: 0.8738954248365991


### **Summary:**

1.**Data Loading and Preparation:**

**Spark Session:**

We set up a Spark session with specific configurations to optimize parallel processing.

**Load Dataset:**

The dataset is loaded into a Spark DataFrame.

**Handle Categorical Variables:**

We use StringIndexer to convert categorical variables to numerical values.

**Rename Target Column:**

The target variable 'y' is renamed to 'label'.

**Convert Label Column:**

The label column is converted to numerical values using StringIndexer.
**Assemble Features:**

We use VectorAssembler to combine all features into a single vector column.

2.**Training the model**

**Initialize and Train the Model:**

A Random Forest classifier is initialized and trained using the pipeline.

**Evaluate the Model:**

The model is evaluated on the test set using the AUC metric.

**Save the Model:**

The trained model is saved for future use.

### **Challenges and Solutions:**

**Handling Categorical Variables:**

The categorical variables were handled using StringIndexer to convert them to numerical values required for the Random Forest algorithm.

**Parallel Processing:**

By using Spark's MLlib and ensuring the data was evenly partitioned, we leveraged Spark's distributed computing capabilities to speed up the training process.

**Resource Management:**

Setting appropriate configurations for executor and driver memory ensured efficient resource usage.

## **Resource Monitoring and Management:**
Implement resource monitoring during data processing and model training. What observations did you make regarding CPU and memory usage?



We will use **Using Spark Event Logging** to capture detailed information about your Spark application’s execution.




### **Resource Monitoring Observations**

Spark Event Logging is a powerful feature that enables us to capture detailed logs about our Spark applications, including job execution details, stage breakdowns, task attempts, and resource usage. These logs are extremely useful for debugging, performance tuning, and understanding the behavior of our Spark applications.

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Path to the event log directory
event_log_dir = "eventlog"

# Create the event log directory if it doesn't exist
if not os.path.exists(event_log_dir):
    os.makedirs(event_log_dir)

# Create Spark session with resource monitoring enabled
spark = SparkSession.builder \
    .appName("BankTermDepositPredictionlog") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", event_log_dir) \
    .getOrCreate()

# Load the dataset
data_path = "bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Handle categorical variables using StringIndexer
categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]

# Rename the target column to 'label'
data = data.withColumnRenamed("y", "label")

# Convert the label column to numerical values
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["age", "balance", "day", "duration", "campaign", "pdays", "previous"] + [col+"_index" for col in categorical_columns],
    outputCol="features"
)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")


# Stop the Spark session
spark.stop()


Test AUC: 0.8738954248365991


### **Summary:**
1. **Event Log Directory Creation:** - The code now checks if the event log directory exists, and creates it if it does not.
2. **Spark Session Configuration:** - The eventLog.dir configuration is updated to use the newly created directory.

3. **Data Preparation and Model Training:** - The rest of the code loads the dataset, processes it, trains a Random Forest classifier and evaluates the model.

4. **Stopping the Spark Session:** - Ensures the Spark session is properly stopped after the model training and evaluation are complete.

## **Task Management and Scheduling:**

Manage multiple parallel tasks, such as different preprocessing tasks. How did you ensure the effective management of these tasks?


Managing multiple parallel tasks, such as different preprocessing tasks, is essential in data processing workflows, especially when working with large datasets or complex pipelines. Effective management of these tasks involves optimizing resource usage, minimizing task execution time, and ensuring that tasks do not interfere with each other.




In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create Spark session with optimized configurations
spark = SparkSession.builder \
    .appName("ParallelPreprocessing") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Load the dataset
data_path = "bank.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)

# Preprocessing tasks as separate functions
def index_categorical_columns(df):
    categorical_columns = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
    indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_columns]
    pipeline = Pipeline(stages=indexers)
    return pipeline.fit(df).transform(df)

def assemble_features(df):
    feature_columns = ["age", "balance", "day", "duration", "campaign", "pdays", "previous",
                       "job_index", "marital_index", "education_index", "default_index",
                       "housing_index", "loan_index", "contact_index", "month_index", "poutcome_index"]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    return assembler.transform(df)

def rename_label_column(df):
    return df.withColumnRenamed("y", "label")

def index_label_column(df):
    label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
    return label_indexer.fit(df).transform(df)

# Apply preprocessing tasks in parallel
data = rename_label_column(data)
data = index_categorical_columns(data)
data = assemble_features(data)
data = index_label_column(data)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=100)

# Define the pipeline
pipeline = Pipeline(stages=[rf])

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label_index", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc}")

# Stop the Spark session
spark.stop()


Test AUC: 0.8738954248365991


### **Summary:**


Here’s how we can manage multiple parallel tasks effectively in Spark:

**Parallelizing Tasks in Spark:**

Spark inherently supports parallel processing, but ensuring that multiple tasks run efficiently in parallel requires careful management of resources and task dependencies.
Here, the Spark session is configured with optimal resource allocations and parallelism settings.

**Managing Resources:**

Managing resources effectively is crucial when running multiple tasks in parallel, especially in a distributed environment like Spark.

Data Partitioning and Task Parallelism

Effective data partitioning is essential to ensure that tasks run in parallel without contention.

**Preprocessing Functions:**

index_categorical_columns(df): Indexes categorical columns using StringIndexer.

assemble_features(df): Assembles feature columns into a feature vector.

rename_label_column(df): Renames the target column to "label".

index_label_column(df): Indexes the label column.

**Pipeline Construction:**

A Pipeline object is created to encapsulate the preprocessing and model training steps.


**Model Training and Evaluation:**

The Random Forest model is trained and evaluated using the preprocessed data.
Resource Monitoring and Management:

The code includes configurations for resource allocation, such as setting the number of shuffle partitions and memory allocations for the driver and executors.

**Monitoring and Managing Task Execution:**

Monitor the progress of parallel tasks using the Spark UI, where you can see the execution of jobs and stages in real-time. This helps in identifying any bottlenecks or imbalances in task execution.

**Monitoring Tools:**

Use Spark UI to monitor job progress, task execution, and resource usage.
Use tools like Ganglia or Prometheus for detailed resource monitoring.

**Tuning Configurations:**

Adjust spark.sql.shuffle.partitions based on the size of the data and cluster resources.
Allocate sufficient memory to executors and driver based on the data size and complexity of transformations.
This approach ensures that multiple preprocessing tasks are managed and executed effectively in parallel, leveraging Spark's capabilities for distributed data processing.




