In [2]:
!pip install pyspark
!pip install mrjob

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=c77b092cae2dc4d4a9bd871b31c552bd189759a06231f13ab5bb9a099586dd2c
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[2K 

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import numpy as np
import pandas as pd 
from sklearn import preprocessing
from pyspark.ml.feature import MinMaxScaler

# # Create a SparkSession
spark = SparkSession.builder.appName("MapReduce Linear Regression").getOrCreate()

# # Load the data as a dataframe
data = spark.read.csv("/content/drive/MyDrive/new.csv", header=True, inferSchema=True)


# Define the input and output columns
inputCols = ['Journey_day','Airline', 'Flight_code', 'Class', 'Source', 'Departure', 'Total_stops',
       'Arrival', 'Destination', 'Duration_in_hours', 'Days_left',
       'Month', 'Day']
outputCol = "Fare"

# Convert the input columns to a vector using VectorAssembler
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
data = assembler.transform(data).select("features", outputCol)

# Initialize the MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features",min = 0, max = 1)

# Compute summary statistics and generate the scaler model
scaler_model = scaler.fit(data)

# Transform the data using the scaler model
data = scaler_model.transform(data).select("scaled_features", outputCol)

# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

sparse_rows = train_data.select("scaled_features").collect()

# Convert the list of SparseVector Row objects to a NumPy array
sparse_array = np.array([row[0].toArray() for row in sparse_rows])
print(sparse_array.shape)

# Define the Map function
def compute_gradient(data_point, weights):
    # Extract the features from the data point as an array
    x = np.array(data_point.scaled_features.toArray())
    # x = preprocessing.normalize([x])
    # # Extract the target variable from the data point
    y = np.array(data_point.Fare)
    
    # # Compute the dot product of the features and weights
    dot_product = sum([(x[i]) * weights[i] for i in range(len(x))])
    # # Compute the difference between the predicted and actual target variables
    error = y - dot_product
    # # Compute the gradient of the cost function with respect to the weights
    gradient = [-2 * error * x[i] for i in range(len(x))]
    # gradient = 2 * np.dot(x.T, error) 

    return gradient



# Define the Reduce function
def reduce_gradients(gradient1, gradient2):
    return [gradient1[i] + gradient2[i] for i in range(len(gradient1))]

# Initialize the weights
weights = np.random.randn(13)
 
Partition_data = train_data.repartition(10)

# Iterate using MapReduce
max_iterations = 20
convergence_tolerance = 0.0001
for i in range(max_iterations):
    gradients = Partition_data.rdd.map(lambda x: compute_gradient(x, weights)).reduce(reduce_gradients)
    print(f"in {i}  the gradients : " , gradients)
    step_size = 0.09
    weights = [(weights[i] - step_size * gradients[i])/sparse_array.shape[0] for i in range(len(weights))]
    print(f"in {i}  the weights : " , weights)
    if sum([abs(gradients[i]) for i in range(len(gradients))]) < convergence_tolerance:
        break



(311807, 13)
in 0  the gradients :  [-7062669226.823181, -9305273460.692623, -11253863852.987904, -2778359244.6612897, -7661050811.372348, -5363126236.147571, -1089661184.6847916, -6413608282.132931, -7752376666.287794, -4248654982.895323, -6871328833.009844, -4077866195.523436, -7296390628.805317]
in 0  the weights :  [2038.5694705859694, 2685.8749593217412, 3248.316251691286, 801.9458572869361, 2211.286384364246, 1548.0132277771659, 314.51990180009057, 1851.2244610951457, 2237.646682923703, 1226.3321473687133, 1983.3409568526763, 1177.035656780967, 2106.0308381467867]
in 1  the gradients :  [-3290234278.3069534, -4218634026.318776, -5618345609.481936, -93280276.82371011, -3672581833.4828944, -2515300348.2431774, -21381762.88422237, -2997321320.043823, -3712000092.9375186, -2230738453.901589, -3098052927.0121484, -1958631031.410076, -3513041762.0614038]
in 1  the weights :  [949.7000504064898, 1217.6755115941887, 1621.690190308832, 26.9269992655431, 1060.061436400866, 726.021479168568

In [9]:
from sklearn.metrics import r2_score,mean_squared_error
# Extract the SparseVector column as a list of Row objects
sparse_rows = test_data.select("scaled_features").collect()

# Convert the list of SparseVector Row objects to a NumPy array
sparse_array = np.array([row[0].toArray() for row in sparse_rows])

Y_predicted = np.dot(sparse_array,weights)
Y_predicted
# Print the resulting NumPy array
print(abs(Y_predicted))
target_rows = test_data.select("Fare").collect()
target_rows
target_array = np.array([row.Fare for row in target_rows])

rmse = np.sqrt(np.mean((target_array - Y_predicted)**2))

# Print the RMSE
print("RMSE:", rmse)


[ 6442.55250893  6741.17507253  6257.88824262 ... 10597.09192985
  9274.25113711  8142.57121637]
RMSE: 25231.505674820837


In [None]:
import sys
import time

import numpy as np
from pyspark.sql import SparkSession


def parseVector(line, split):
    '''
    Input:
        - line: String read from a file
        - split: Parameter to split the string
    
    Returns numpy array of each record with float values
    '''
    return np.array([float(x) for x in line.split(split)])


def euclideanDistance(test, train):
    '''
    Input:
        - test: Numpy Array with label as last value
        - train: Numpy Array with label as last value
    
    Returns euclidean distance between test and train arrays
    '''
    return int(train[-1]), np.sum((test[:-1] - train[:-1]) ** 2)


if __name__ == "__main__":


    spark = SparkSession.builder.appName("Map Reduce KNN").getOrCreate()

    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    input_file = spark.read.csv("/content/drive/MyDrive/Normalized_data.csv", header=True, inferSchema=True)
    input_data = input_file.rdd.map(lambda x: np.array(x))

    # Split data into training and testing sets
    training_data, test_data = input_data.randomSplit([0.99, 0.01], seed=47)

    K = 1

    start_time = time.time()

    predict_labels = []
    true_labels=[]
    for test_point in test_data.collect():
        
        true_label = int(test_point[-1])

        distances = training_data.map(
            lambda train_point: euclideanDistance(test_point, train_point))

        k_nearest_neighbours = sc.parallelize(
            distances.takeOrdered(K, key = lambda p: p[1])).map(
                lambda x: (x[0], 1))

        k_nearest_predictions = k_nearest_neighbours.reduceByKey(
            lambda x1, x2: x1 + x2)

        predict_label = k_nearest_predictions.takeOrdered(1,
            key = lambda x: -x[1])[0][0]

        predict_labels.append(predict_label)
        true_labels.append(true_label)

    end_time = time.time()
    RMSE=np.sqrt(np.mean((true_labels-predict_labels)**2))
    time_taken = end_time - start_time

    print ("\nRMSE: " + str(RMSE) + "%\n")
    print ("\nTime taken: " + str(time_taken) + "\n")


In [11]:
    end_time = time.time()
    print(len(predict_labels))
    error = 0
    for i in range(len(predict_labels)):
       error += (true_labels[i]-predict_labels[i])**2
    RMSE=np.sqrt(np.mean(error))
    time_taken = end_time - start_time

    print ("\nRMSE: " + str(RMSE) + "%\n")
    print ("\nTime taken: " + str(time_taken) + "\n")

712

RMSE: 176406.579035477%


Time taken: 10193.650690793991

