<a href="https://www.kaggle.com/code/dsptlp/spark?scriptVersionId=163781797" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

<img src="https://spark.apache.org/docs/3.1.3/api/python/_static/spark-logo-reverse.png" alt="Sample Image" width="200"/>

# SPARK 
- Reasons to Use Spark
- This notebook will compare Spark VS scikit-learn

# NOTE 
- Spark is designed to work in a distributed computing environment and is most effective when dealing with large datasets and clusters of machines. 
- In Kaggle's limited environment, we are not using a distributed computing environment but will be able to use all the computer resources which will be the only benefit. 

# SPARK ADVANTAGES

1. **Speed:** Spark is known for its speed, as it can perform in-memory processing, reducing the need to write intermediate results to disk. This makes Spark well-suited for iterative algorithms and interactive data analysis.

2. **Ease of Use:** Spark provides high-level APIs in languages such as Scala, Java, Python, and R, making it accessible to a wide range of users. It also offers built-in libraries for various tasks like SQL, machine learning (MLlib), graph processing (GraphX), and stream processing (Spark Streaming).

3. **Scalability:** Spark is designed for distributed computing, allowing it to scale horizontally across a cluster of machines. This makes it suitable for handling large datasets and processing tasks that would be challenging for single-node systems.

4. **Versatility:** Spark supports a variety of data processing scenarios, including batch processing, interactive queries, streaming analytics, and machine learning. This versatility makes it a preferred choice for organizations with diverse data analysis needs.

5. **Fault Tolerance:** Spark provides fault tolerance through lineage information and resilient distributed datasets (RDDs). If a node fails, Spark can recompute the lost data using the lineage information, ensuring the reliability of data processing.

6. **Integration with Big Data Ecosystem:** Spark seamlessly integrates with other big data tools and technologies, such as Hadoop Distributed File System (HDFS), Apache Hive, Apache HBase, and more. This allows users to leverage existing data storage and processing systems.

7. **Community Support:** Spark has a large and active open-source community. This means continuous development, improvements, and a wealth of resources, including documentation, forums, and tutorials.

8. **In-Memory Processing:** Spark's ability to store intermediate data in memory rather than writing to disk can significantly improve performance, especially for iterative algorithms and interactive data analysis, compared to traditional disk-based processing.


In [None]:
# Install PySpark
try:
    import pyspark
except ImportError:
    print("pyspark not found. Installing...")
    !pip install pyspark > pyspark.log.txt
    print("pyspark installed successfully!")

pyspark not found. Installing...


In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.preprocessing import StandardScaler
from matplotlib.lines import Line2D
from matplotlib import cm
import numpy as np 
import pandas as pd
import seaborn as sns
import warnings
import timeit

from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as SparkRFClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier as PandasRFClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import GridSearchCV

# Suppress all warnings
warnings.filterwarnings("ignore")

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("Spark").getOrCreate()

# Set log level to OFF 
spark.sparkContext.setLogLevel("OFF")

# Print the Spark version
print("Spark Version:", spark.version)

In [None]:
file_path  = "/kaggle/input/tabular-dataset-ready-for-malicious-url-detection/train_dataset.csv"

# LOADING DATA

In [None]:
measures = []
column_list = ['label','url_has_login','url_has_client','url_has_server','url_len']
runs = 1

## SPARK

In [None]:
def load_csv_using_spark():
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Perform the summary: count number of records grouped by a column
    summary_df = df.groupBy("label").count()
    
    # Perform an action (triggers execution, note that spark uses Lazy Execution)
    summary_df.collect() #show()
    
    return df
    
# Measure the execution time
execution_time = timeit.timeit(load_csv_using_spark, number=runs)

# Print the result
print(f"Execution time using SPARK: {execution_time} seconds")
measures.append(('SPARK','load_csv',execution_time))

spark_df = load_csv_using_spark()
spark_df = spark_df.select(column_list)

## PANDAS

In [None]:
def load_csv_using_pandas():
    data_df = pd.read_csv(file_path, delimiter=',') 
    summary_df = data_df[['url_has_login','label']].groupby(['label']).count()
    return data_df
    
# Measure the execution time
execution_time = timeit.timeit(load_csv_using_pandas, number=runs)

# Print the result
print(f"Execution time using PANDAS: {execution_time} seconds")
measures.append(('PANDAS','load_csv',execution_time))

pandas_df = load_csv_using_pandas()
pandas_df = pandas_df[column_list]

# PCA

## SPARK

In [None]:
# Apply PCA
pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(spark_df)
result = model.transform(spark_df)

# Show the result
result.select("features", "pca_features").show(truncate=False)

# Extract data for plotting
features_and_labels = result.select("pca_features").rdd.map(lambda row: (row.pca_features,))

# Collect transformed features and labels
features, = zip(*features_and_labels.collect())

# Extract labels from the original DataFrame
labels = result.select("features").rdd.map(lambda row: row.features).collect()

# Plot the data points with different colors for each label
for i, label in enumerate(labels):
    plt.scatter(features[i][0], features[i][1], label=f"Label {i + 1}")

plt.title("PCA Visualization with Labels")
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.legend()
plt.show()

## SCIKIT-LEARN

# RANDOMFOREST

## SPARK

In [None]:
def rf_using_spark():
    numericColsAll  = ['url_has_login','url_has_client','url_has_server','url_len']
    label = 'label'

    #VECTORIZE NUMERIC COLS
    assembler = VectorAssembler(inputCols=numericColsAll , outputCol="Numfeatures")
    df = assembler.transform(spark_df)

    # Split the data into training and testing sets
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

    # Create the RandomForestClassifier
    rf = SparkRFClassifier(featuresCol="Numfeatures", labelCol="label")

    # Create a pipeline
    pipeline = Pipeline(stages=[rf])

    # Set up a parameter grid and cross-validator
    paramGrid = (ParamGridBuilder()
                 .addGrid(rf.numTrees, [10, 20, 30])
                 .addGrid(rf.maxDepth, [5, 10, 15])
                 .build())

    evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=5)

    # Fit the model
    cv_model = crossval.fit(train_data)

    # Make predictions on the test set
    predictions = cv_model.transform(test_data)

    # Evaluate the model
    area_under_roc = evaluator.evaluate(predictions)
    print(f"Area under ROC: {area_under_roc}")
    
# Measure the execution time
execution_time = timeit.timeit(rf_using_spark, number=runs)

# Print the result
print(f"Execution time using SPARK: {execution_time} seconds")
measures.append(('SPARK','RF',execution_time))    

## scikit-learn

In [None]:
def rf_using_scikit():

    # Split the data into features (X) and target variable (y)
    X = pandas_df.drop("label", axis=1)  
    y = pandas_df["label"]

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Define the parameter grid to search
    param_grid = {
        'n_estimators': [50],
        'max_depth': [5, 10, 15]
    }

    # Build the RandomForest model
    rf = PandasRFClassifier(n_estimators=100, max_depth=10, random_state=42, n_jobs=-1)

    # Create GridSearchCV
    grid_search = GridSearchCV(rf, param_grid, cv=5, scoring='roc_auc')

    # Fit the model with the best parameters found by grid search
    grid_search.fit(X_train, y_train)

    # Get the best parameters from the grid search
    best_params = grid_search.best_params_

    # Get the best model from the grid search
    best_rf = grid_search.best_estimator_

    # Perform cross-validation
    cv_scores = cross_val_score(best_rf, X_train, y_train, cv=5, scoring='roc_auc')

    # Fit the best model on the full training set
    best_rf.fit(X_train, y_train)

    # Make predictions on the test set
    predictions = best_rf.predict(X_test)

    # Evaluate the best model
    area_under_roc = roc_auc_score(y_test, predictions)
    print(f"Area under ROC: {area_under_roc}")

        
# Measure the execution time
execution_time = timeit.timeit(rf_using_scikit, number=runs)

# Print the result
print(f"Execution time using SCIKIT: {execution_time} seconds")
measures.append(('SCIKIT','RF',execution_time)) 

# FINAL RESULTS

In [None]:
print(measures)