<h1>Map Reduce</h1>

In [1]:
# from mrjob.job import MRJob
# from mrjob.step import MRStep
import pandas as pd
import numpy as np
from sklearn.tree import DecisionTreeClassifier
import threading
from random import shuffle
from collections import Counter
from threading import Lock


In [2]:

class classifier:
    def __init__(self, input_file, output_dir, n_mappers, n_reducers, combiner, preprocess, sub_classifier_config):
        self.input_file = input_file
        self.output_dir = output_dir
        self.n_mappers = n_mappers
        self.n_reducers = n_reducers
        self.combiner = combiner
        self.preprocess = preprocess
        self.config = sub_classifier_config
    
    def preprocess_data(self, df: pd.DataFrame):
        self.labels: list = df["current shot outcome"].unique().tolist() 
        df["current shot outcome"] = df["current shot outcome"].apply(lambda x: self.labels.index(x))
        return df
        
        
    def split_n_shuffle(self, file = None):

        if self.input_file:
            try:
                df = pd.read_csv(self.input_file)
            except Exception as e:
                print(e)
                return 
        df = file
        split_size = self.n_mappers
        headers = df.columns
        chunks = np.array_split(df.to_numpy(), split_size)
        shuffle(chunks)
        list_of_df = [pd.DataFrame(data = chunk, columns = headers) for chunk in chunks]
        self.temp_file_splits = []
        for i, chunk_df in enumerate(list_of_df):
            print(f"Chunk {i+1}")
            self.temp_file_splits.append(f"{self.output_dir}/chunk{i}.csv")
            chunk_df.to_csv(f"{self.output_dir}/chunk{i}.csv")
            
        
    def run_mapper(self):
        print("commencing threads with num of threads => ", len(self.temp_file_splits))
        # Pass an iterable of input file paths to starmap
        args = [temp_file for temp_file in self.temp_file_splits]
        # print(args)
        # Use starmap to map the mapper function to each input file
        num_threads =  len(self.temp_file_splits)
        # Create a list to store the threads
        threads = []
        results = []
        lock = Lock()
        # Create and start threads with sample data
        for i in range(num_threads):
            thread = threading.Thread(target=self.mapper, args=(args[i], results, lock))
            thread.start()
            threads.append(thread)
        # Wait for all threads to finish (optional)
        for thread in threads:
            thread.join()
        # print("Done")
        # print("all threads are done...", threads)
        return results
        
    def reducer(self):
        return list([tree[1] for tree in self.trees])
    
    def run_reducer(self):
        return self.reducer()
    
                
    def mapper(self, temp_file, results, lock):
        print("thread started mapping on file => ", temp_file)
        df = pd.read_csv(temp_file)
        # if self.preprocess:
        #   df = self.preprocess_data(df) 
        # print(df) 
        tree = DecisionTreeClassifier(criterion = self.config['criterion'], random_state = self.config['random_state'], max_depth = self.config['max_depth'])
        tree.fit(df.drop(self.config["target"], axis = 1), df[self.config["target"]])
        # print("thread with file %s is done", temp_file)
        with lock:
            results.append((None, tree))
        
        
    def run(self, file):
        print("commencing splitting and shuffling...")
        self.split_n_shuffle(file)
        print("splitting and shuffling is done, commencing mapping...")
        self.trees = self.run_mapper()
        print("mapping is done, commencing reduction...")
        self.trees = self.run_reducer()
        print("all done...")
        
    def predict(self, X):
        predictions = []
        for tree in self.trees:
            predictions.append(tree.predict(X))
        predictions = np.array(predictions)
        final_predictions = []
        for i in range(predictions.shape[1]):
            final_predictions.append(Counter(predictions[:, i]).most_common(1)[0][0])
        return np.array(final_predictions)
        
        


In [3]:
data = {
    'feature1': [1, 2, 3, 4, 5],
    'feature2': [5, 4, 3, 2, 1],
    'target': [0, 1, 0, 1, 0]
}
df = pd.DataFrame(data)

In [4]:
config = {
    "criterion" : "gini",
    "target" : "target",
    "random_state" : 52,
    "max_depth": 5
}
mp = classifier(None, "./data", 5, 1, False, True, config)


In [5]:
mp.run(df)

commencing splitting and shuffling...
Chunk 1
Chunk 2
Chunk 3
Chunk 4
Chunk 5
splitting and shuffling is done, commencing mapping...
commencing threads with num of threads =>  5
thread started mapping on file =>  ./data/chunk0.csv
thread started mapping on file =>  ./data/chunk1.csv
thread started mapping on file =>  ./data/chunk2.csv
thread started mapping on file =>  ./data/chunk3.csv
thread started mapping on file =>  ./data/chunk4.csv
mapping is done, commencing reduction...
all done...


In [7]:
mp.predict([[1,5,1]])



ValueError: X has 2 features, but DecisionTreeClassifier is expecting 3 features as input.