Coding Part - Descriptive Statistics & Machine Learning

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')


Mounted at /content/gdrive


In [None]:
csv_file_path = "/content/gdrive/MyDrive/cc_project/Boston.csv"

In [None]:
from pyspark.sql import SparkSession

In [None]:
#reading the csv
df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_file_path)

In [None]:
df.dtypes

[('crim', 'double'),
 ('zn', 'double'),
 ('indus', 'double'),
 ('chas', 'int'),
 ('nox', 'double'),
 ('rm', 'double'),
 ('age', 'double'),
 ('dis', 'double'),
 ('rad', 'int'),
 ('tax', 'int'),
 ('ptratio', 'double'),
 ('b', 'double'),
 ('lstat', 'double'),
 ('medv', 'double')]

In [None]:
# target = input("Please enter the value for Target variable name: ")

In [None]:
#clean column names -> column names with no special characters
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace

def clean_column_names(df, replacement_char='_'):
    # spark = SparkSession.builder.appName("Column Name Cleaner").getOrCreate()
    cleaned_column_names = [regexp_replace(col, r'[^a-zA-Z0-9-_]+', replacement_char).alias(col) for col in df.columns]
    df_cleaned = df.select(*cleaned_column_names)
    return df_cleaned

#df = clean_column_names(df)

In [None]:
#read and get first five rows
from pyspark.sql import SparkSession

def read_and_get_first_5_rows(df):
    # spark = SparkSession.builder.appName("Get First 5 Rows").getOrCreate()
    first_5_rows = df.limit(5)
    return first_5_rows

first_5_rows = read_and_get_first_5_rows(df)

In [None]:
#get the length of the dataframe
from pyspark.sql import SparkSession

def get_dataframe_length(df):
    # spark = SparkSession.builder.appName("DataFrame Length").getOrCreate()
    # Use the count action to get the length of the DataFrame
    length = df.count()

    return length

length = get_dataframe_length(df)

In [None]:
#get the info of the dataframe

from pyspark.sql import SparkSession

def info(df):
    #spark = SparkSession.builder.appName("DataFrame Info").getOrCreate()
    df.printSchema()

# info(df)

In [None]:
#get the dataframe description

from pyspark.sql import SparkSession

def get_dataframe_description(df):
    #spark = SparkSession.builder.appName("DataFrame Description").getOrCreate()
    description = df.describe()
    return description

#description = get_dataframe_description(df)

In [None]:
#removing duplicate rows
from pyspark.sql import SparkSession

def remove_duplicate_rows(df):
    #spark = SparkSession.builder.appName("Remove Duplicate Rows").getOrCreate()
    df = df.dropDuplicates()
    return df

df = remove_duplicate_rows(df)

In [None]:
#get the categorical columns
categorical_cols = [col_name for col_name, dtype in df.dtypes if dtype == "string"]
categorical_description_output = df.select(categorical_cols).describe()
# categorical_description_output.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

In [None]:
#missing values count of each column

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
def get_missing_values_count(df):
    missing_values_count = df.select(*[spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    return missing_values_count

In [None]:
missing_count = get_missing_values_count(df)

In [None]:
missing_count.show()

+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|  b|lstat|medv|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|      0|  0|    0|   0|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+



In [None]:
#handling missing values imputing with threshold & deleting missing values based on threshold
def clean_dataframe(df, null_threshold=0.3, fill_missing=True):

    total_rows = df.count()
    threshold = null_threshold * total_rows

    # Find columns to delete
    columns_to_delete = [col_name for col_name in df.columns if df.filter(col(col_name).isNull()).count() > threshold]

    # Drop columns with too many NULL values
    cleaned_df = df.drop(*columns_to_delete)

    if fill_missing:
        for col_name, data_type in cleaned_df.dtypes:
            null_count = cleaned_df.filter(col(col_name).isNull()).count()
            if null_count > 0:
                if data_type in ['string', 'object']:
                    mode_value = cleaned_df.groupBy().agg({col_name: 'max'}).collect()[0][0]
                    cleaned_df = cleaned_df.fillna({col_name: mode_value})
                elif data_type in ['int', 'int32', 'int64', 'float', 'float32', 'float64']:
                    median_value = cleaned_df.approxQuantile(col_name, [0.5], 0.25)[0]
                    cleaned_df = cleaned_df.fillna({col_name: median_value})

    return cleaned_df

In [None]:
cleaned_df = clean_dataframe(df)

In [None]:
#returns categorical and quantitative columns

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import StringType

def get_categorical_and_quantitative_columns(df):
    categorical_columns = []
    quantitative_columns = []

    for column in df.columns:
        if df.schema[column].dataType == StringType():
            categorical_columns.append(column)
        else:
            quantitative_columns.append(column)

    return categorical_columns, quantitative_columns

In [None]:
#encoding the categorical variables using label encoding
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

def encode_categorical_variables(df):
    categorical_columns, _ = get_categorical_and_quantitative_columns(df)

    for column in categorical_columns:
        indexer = StringIndexer(inputCol=column, outputCol=column+"_enc", handleInvalid="skip")
        df = indexer.fit(df).transform(df)
        df = df.drop(column)

    return df

In [None]:
encoded_df = encode_categorical_variables(cleaned_df)

In [None]:
encoded_df.show()

+-------+----+-----+----+------+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|   nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+------+-----+-----+------+---+---+-------+------+-----+----+
|0.62976| 0.0| 8.14|   0| 0.538|5.949| 61.8|4.7075|  4|307|   21.0| 396.9| 8.26|20.4|
|0.22188|20.0| 6.96|   1| 0.464|7.691| 51.8|4.3665|  3|223|   18.6|390.77| 6.58|35.2|
|0.06899| 0.0|25.65|   0| 0.581| 5.87| 69.7|2.2577|  2|188|   19.1|389.15|14.37|22.0|
|0.97617| 0.0|21.89|   0| 0.624|5.757| 98.4| 2.346|  4|437|   21.2|262.76|17.31|15.6|
|0.22876| 0.0| 8.56|   0|  0.52|6.405| 85.4|2.7147|  5|384|   20.9|  70.8|10.63|18.6|
|  2.924| 0.0|19.58|   0| 0.605|6.101| 93.0|2.2834|  5|403|   14.7|240.16| 9.81|25.0|
|0.03578|20.0| 3.33|   0|0.4429| 7.82| 64.5|4.6947|  5|216|   14.9|387.31| 3.76|45.4|
|0.05372| 0.0|13.92|   0| 0.437|6.549| 51.0|5.9604|  4|289|   16.0|392.85| 7.39|27.1|
|0.03961| 0.0| 5.19|   0| 0.515|6.037| 34.5|5.9853|  5

In [None]:
encoded_df.dtypes

[('crim', 'double'),
 ('zn', 'double'),
 ('indus', 'double'),
 ('chas', 'int'),
 ('nox', 'double'),
 ('rm', 'double'),
 ('age', 'double'),
 ('dis', 'double'),
 ('rad', 'int'),
 ('tax', 'int'),
 ('ptratio', 'double'),
 ('b', 'double'),
 ('lstat', 'double'),
 ('medv', 'double')]

In [None]:
#code for regression model Machine Learning
def calculate_rmse(encoded_df, target):
    feature_columns = [col for col in encoded_df.columns if col != target]
    encoded_df = encoded_df.dropna(subset=feature_columns)
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    encoded_df = assembler.transform(encoded_df)
    train_data, test_data = encoded_df.randomSplit([0.8, 0.2], seed=123)
    lr = LinearRegression(featuresCol="features", labelCol=target)
    lr_model = lr.fit(train_data)
    predictions = lr_model.transform(test_data)
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    return rmse

In [None]:
#code for classification model Machine Learning

def calculate_accuracy(encoded_df, target):
    feature_columns = [col for col in encoded_df.columns if col != target]
    encoded_df = encoded_df.dropna(subset=feature_columns)
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    lr = LogisticRegression(labelCol=target, featuresCol="features")
    pipeline = Pipeline(stages=[assembler, lr])
    (training_data, testing_data) = encoded_df.randomSplit([0.7, 0.3], seed=42)
    model = pipeline.fit(training_data)
    predictions = model.transform(testing_data)
    evaluator = MulticlassClassificationEvaluator(labelCol=target, predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    return accuracy

In [None]:
#asking user to input to input
user_input = input("Enter 'classification', 'regression' ").lower()

Enter 'classification', 'regression' regression


In [None]:
#target variable will choose which model to go for:
if "classification" in user_input:
    target = input("Enter the classification target column name: ")
    classification_accuracy = calculate_accuracy(encoded_df, target)
    print("Classification Accuracy:", classification_accuracy)

if "regression" in user_input:
    target = input("Enter the regression target column name: ")
    regression_rmse = calculate_rmse(encoded_df, target)
    print("Regression RMSE:", regression_rmse)

Enter the regression target column name: medv
Regression RMSE: 3.9524689809083613


# AWS GLUE CODE Pre Processing

In [None]:
from pyspark.sql import SparkSession
import boto3
from io import BytesIO
import sys
from awsglue.utils import getResolvedOptions
import matplotlib.pyplot as plt
import json
from pyspark.sql.functions import col, sum as sum_
from pyspark.sql import functions as F
from pyspark.sql.functions import col, regexp_replace, isnan, when, count, countDistinct, mean, stddev
from pyspark.sql.types import StringType, DoubleType, IntegerType
import re
from pyspark.sql.functions import col, isnan, when, count, mean, stddev, approx_count_distinct


def read_target_variable_from_s3(bucket, key):
    s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    target_variable = obj['Body'].read().decode('utf-8').strip()
    return target_variable


def clean_column_name(column_name):
    return re.sub(r"[ ,;{​​​​​​​}​​​​​​​()\n\t=]", "_", column_name)


def main():
    # Initialize Spark session
    spark = SparkSession.builder.appName("DataFrameAnalysis").getOrCreate()


    # Fetch arguments passed to the Glue job
    args = getResolvedOptions(sys.argv, ['INPUT_PATH', 'OUTPUT_BUCKET', 'OUTPUT_PATH'])
    input_path = args['INPUT_PATH']
    output_bucket = args['OUTPUT_BUCKET']
    output_path = args['OUTPUT_PATH']

       # Define the target variable file location in S3
    # bucket_name = 'mycloudcomputingproject'  # replace with your S3 bucket name
    # target_variable_key = 'target_variable.txt'  # consistent file name
    # # Read the target variable
    # target_variable = read_target_variable_from_s3(bucket_name, target_variable_key)


    # Read CSV file from S3
    df = spark.read.csv(f"s3://{​​​​​​​input_path}​​​​​​​", header=True, inferSchema=True)


    # Remove duplicates
    df = df.dropDuplicates()


    # Clean column names
    new_column_names = [clean_column_name(c) for c in df.columns]
    df = df.toDF(*new_column_names)


    # Descriptive statistics
    json_data = {​​​​​​​}​​​​​​​
    json_data['total_rows'] = df.count()
    column_stats = {​​​​​​​}​​​​​​​


    # Identifying categorical and quantitative columns
    categorical_columns = []
    quantitative_columns = []


    for column in df.columns:
        column_data = {​​​​​​​}​​​​​​​
        column_type = df.schema[column].dataType


        if isinstance(column_type, StringType):
            categorical_columns.append(column)
        elif isinstance(column_type, (DoubleType, IntegerType)):
            quantitative_columns.append(column)


        column_data['distinct_count'] = df.select(column).distinct().count()
        column_data['missing_values'] = df.filter((col(column).isNull()) | (isnan(col(column)))).count()


        if isinstance(column_type, (DoubleType, IntegerType)):
            column_data['mean'] = df.select(mean(col(column))).collect()[0][0]
            column_data['stddev'] = df.select(stddev(col(column))).collect()[0][0]


        column_stats[column] = column_data


    json_data['column_stats'] = column_stats
    json_data['categorical_columns'] = categorical_columns
    json_data['quantitative_columns'] = quantitative_columns



    # Handling missing values
    total_rows = df.count()
    if total_rows > 0:


        for column in df.columns:
            missing_count = df.filter((col(column).isNull()) | (isnan(col(column)))).count()
            if missing_count / total_rows <= 0.3:
                # If missing values are less than 30%, drop rows
                df = df.filter(col(column).isNotNull())
            else:
                # If more than 30%, impute with median/mode
                if column in quantitative_columns:
                    median_value = df.approxQuantile(column, [0.5], 0.05)[0]
                    df = df.na.fill({​​​​​​​column: median_value}​​​​​​​)
                elif column in categorical_columns:
                    mode_value = df.groupBy(column).count().orderBy('count', ascending=False).first()[0]
                    df = df.na.fill({​​​​​​​column: mode_value}​​​​​​​)


        # Update total rows after handling missing values
        json_data['rows_after_handling_missing'] = df.count()
    else: pass


    # Save JSON result to S3
    s3_client = boto3.client('s3')
    json_key = output_path + input_path.split('/')[-1] + '_summary.json'
    s3_client.put_object(Bucket=output_bucket, Key=json_key, Body=json.dumps(json_data, indent=4))


    # Stop the Spark session
    spark.stop()


if __name__ == "__main__":
    main()


#AWS Lambda function for results

In [None]:
import boto3
from botocore.exceptions import ClientError
import json



def lambda_handler(event, context):
    if 'id' in event:
        unique_id = event['id']
    else:
        return {
            'statusCode': 400,
            'body': json.dumps('Missing unique ID parameter.')
        }

    s3 = boto3.client('s3')
    bucket_name = 'mycloudcomputingproject'
    results_prefix = f's3://mycloudcomputingproject/results/{unique_id}'  # Adjusted prefix



    try:
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=results_prefix)

        if 'Contents' in response and response['Contents']:
            object_key = response['Contents'][0]['Key']
            file = s3.get_object(Bucket=bucket_name, Key=object_key)
            file_content = file['Body'].read().decode('utf-8')

            # Parse JSON content if the file is a JSON file
            if object_key.endswith('.json'):
                file_content = json.loads(file_content)



            return {
                'statusCode': 200,
                'body': json.dumps(file_content)
            }
        else:
            return {
                'statusCode': 404,
                'body': json.dumps('No results found for the provided ID.')
            }
    except ClientError as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f"Error fetching results: {str(e)}")
        }

#AWS lambda Code for uploading

In [None]:
import boto3
import json



def lambda_handler(event, context):
    s3 = boto3.client('s3')
    bucket_name = 'mycloudcomputingproject'



    file_key = event['queryStringParameters']['filename']
    target_variable = event['queryStringParameters']['targetVariable']  # Retrieve the target variable



    # Store the target variable in S3
    # s3.put_object(Bucket=bucket_name, Key=f"{file_key}_target_variable", Body=target_variable)



    presigned_url = s3.generate_presigned_url('put_object', Params={'Bucket': bucket_name, 'Key': file_key}, ExpiresIn=3600)


    return {
        'statusCode': 200,
        'headers': {
            'Access-Control-Allow-Origin': '*',
            'Content-Type': 'application/json'
        },
        'body': json.dumps({'url': presigned_url})
    }

#HTML Code

In [None]:
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Upload CSV/Excel File</title>
    <style>
        /* CSS for loading indicator */
        #loading-indicator {
            display: none;
            font-size: 16px;
            color: gray;
        }
    </style>
</head>

<body>
    <h2>Option 1 - Choose a CSV/Excel file</h2>
    <label for="file-input" style="cursor: pointer;">
        Choose file <span id="file-name">No file chosen</span>
    </label>
    <input type="file" id="file-input" accept=".csv, .xls, .xlsx" style="display: none;">
    <button id="upload-button" disabled>Upload</button>

    <!-- Dropdown for CSV column headings -->
    <div id="csv-dropdown-container" style="display: none;">
        <label for="csv-column-dropdown">Select Column:</label>
        <select id="csv-column-dropdown"></select>
    </div>

    <div id="results-container">
        <h2>Results</h2>
        <div id="results"></div>
        <div id="loading-indicator">Loading results...</div> <!-- Loading indicator -->

        <!-- Display selected column message -->
        <div id="selected-column-message"></div>
    </div>

    <script>
        const fileInput = document.getElementById("file-input");
        const fileNameDisplay = document.getElementById("file-name");
        const uploadButton = document.getElementById("upload-button");
        const csvDropdownContainer = document.getElementById("csv-dropdown-container");
        const csvColumnDropdown = document.getElementById("csv-column-dropdown");
        const loadingIndicator = document.getElementById("loading-indicator");
        const selectedColumnMessage = document.getElementById("selected-column-message");

        function generateUniqueId() {
            return new Date().getTime().toString(); // Simple timestamp-based ID
        }

        function showCsvDropdown(file) {
            const reader = new FileReader();
            reader.onload = function (e) {
                const content = e.target.result;
                const lines = content.split('\n');
                const columnHeadings = lines[0].split(',');

                csvColumnDropdown.innerHTML = "";
                columnHeadings.forEach((heading, index) => {
                    const option = document.createElement("option");
                    option.value = index;
                    option.text = heading;
                    csvColumnDropdown.add(option);
                });

                csvDropdownContainer.style.display = "block";
            };
            reader.readAsText(file);
        }

        function uploadFileToS3(file, uniqueId, targetVariable) {
            const modifiedFileName = `${uniqueId}_${file.name}`;
            fetch(`https://ytu7o2xxgd.execute-api.us-east-2.amazonaws.com/prod/uploading?filename=${encodeURIComponent(modifiedFileName)}&targetVariable=${encodeURIComponent(targetVariable)}`, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json'
                }
            })
                .then(response => response.json())
                .then(data => {
                    let presignedUrl = data.url;
                    fetch(presignedUrl, {
                        method: 'PUT',
                        body: file
                    })
                        .then(response => {
                            if (response.ok) {
                                console.log("File uploaded successfully.");
                                pollForResults(uniqueId, 0); // Start polling for results
                            } else {
                                console.error('Error during file upload:', response);
                            }
                        })
                        .catch(uploadError => console.error('Error during file upload:', uploadError));
                })
                .catch(fetchError => console.error('Error fetching presigned URL:', fetchError));
        }

        uploadButton.addEventListener("click", () => {
            const selectedFile = fileInput.files[0];
            if (selectedFile) {
                const uniqueId = generateUniqueId();
                const selectedColumnValue = csvColumnDropdown.selectedOptions[0].text;
                uploadFileToS3(selectedFile, uniqueId, selectedColumnValue);
            }
        });

        function pollForResults(uniqueId, attempts) {
            const maxAttempts = 30;
            const resultsContainer = document.getElementById("results");
            const apiGatewayUrl = `https://8ygx8j5fxd.execute-api.us-east-2.amazonaws.com/prod/result?id=${uniqueId}`;

            loadingIndicator.style.display = "block";
            resultsContainer.textContent = "";

            function fetchResults() {
                fetch(apiGatewayUrl)
                    .then(response => {
                        if (response.ok) {
                            return response.json();
                        } else {
                            throw new Error(`Response not OK: ${response.statusText}`);
                        }
                    })
                    .then(data => {
                        if (data.statusCode !== 404) {
                            resultsContainer.textContent = JSON.stringify(data, null, 2);
                            loadingIndicator.style.display = "none"; // Hide loading indicator
                        } else if (attempts < maxAttempts) {
                            setTimeout(() => {
                                pollForResults(uniqueId, attempts + 1);
                            }, 10000); // Wait for 10 seconds before retrying
                        } else {
                            resultsContainer.textContent = 'No results found after maximum attempts.';
                            loadingIndicator.style.display = "none"; // Hide loading indicator
                        }
                    })
                    .catch(error => {
                        console.error('Error:', error);
                        if (attempts < maxAttempts) {
                            setTimeout(() => {
                                pollForResults(uniqueId, attempts + 1);
                            }, 10000);
                        } else {
                            resultsContainer.textContent = 'Error fetching results after maximum attempts.';
                            loadingIndicator.style.display = "none";
                        }
                    });
            }

            fetchResults();
        }

        fileInput.addEventListener("change", () => {
            const selectedFile = fileInput.files[0];
            if (selectedFile) {
                fileNameDisplay.textContent = selectedFile.name;
                uploadButton.disabled = false;
                showCsvDropdown(selectedFile);
            } else {
                fileNameDisplay.textContent = "No file chosen";
                uploadButton.disabled = true;
                csvDropdownContainer.style.display = "none";
            }
        });
    </script>
</body>

</html>


#End of the Code