In [9]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import math
import time
import multiprocessing

In [2]:
%matplotlib inline

#read csv file
pandasData = pd.read_csv('BlackFriday.csv')
purchaseData = pandasData['Purchase'] 

#convert pandas datafram into dask datafram
daskData = dd.from_pandas(pandasData, npartitions=5)
purchaseOfBlackFriday = daskData['Purchase']

In [43]:
def calculateMean(data):
    sumOfElement = 0
    for index, value in data.items():
        sumOfElement = sumOfElement + value
        
    mean = sumOfElement/len(data)
    return mean
         
def calculateMeanParallel(data):
    startTime = time.time()
    mean = data.mean().compute()
    return mean, startTime

def calculateStandardDeviation(data):
    sumStd = 0
    mean = calculateMean(data)
    for index, value in data.items():
        sumStd += pow((value-mean),2)
    
    return math.sqrt(sumStd/len(data))

def binarySearch (arr, left, right, x): 
  
    if right >= left: 
  
        mid = left + (right - left)/2
  
        if arr[mid] == x: 
            return mid 
          
        elif arr[mid] > x: 
            return binarySearch(arr, left, mid-1, x) 
  
        else: 
            return binarySearch(arr, mid + 1, right, x) 
  
    else: 
        return -1

def Sequential_Search(dlist, item):

    pos = 0
    found = False
    
    while pos < len(dlist) and not found:
        if dlist[pos] == item:
            found = True
        else:
            pos = pos + 1
    
    return found, pos

In [28]:
start = time.clock()
serialMean = calculateMean(purchaseData)
print(".........Serial Calculation...........\n")
print("Serial Mean is:", serialMean)
print("Time taken to calculate the mean is:", time.clock()-start)

print("\n.........Parallel Calculation.......\n")

start = time.clock()
parallelMean = calculateMeanParallel(purchaseOfBlackFriday)
print("Parallel Mean is:", serialMean)
print("Time taken to calculate the mean is:", time.clock()-start)

.........Serial Calculation...........

('Serial Mean is:', 9333)
('Time taken to calculate the mean is:', 0.28272699999999773)

.........Parallel Calculation.......

('Parallel Mean is:', 9333)
('Time taken to calculate the mean is:', 0.055842000000001946)


In [29]:
#Sorting
start = time.clock()
sortDataSetPandas = pandasData.sort_values(by='Purchase', ascending = True)
print("Serial sorting time is:", time.clock()-start)

start = time.clock()
sortDataSetParallel = purchaseOfBlackFriday.map_partitions(lambda x: x.sort_index())
print("Parallel sorting time is:", time.clock()-start)

('Serial sorting time is:', 0.23843899999999962)
('Parallel sorting time is:', 0.0034090000000013276)


In [91]:
#standard deviation
start = time.clock()
stdDataSetPandas = calculateStandardDeviation(purchaseData)
print("Serial Standard deviation is:", stdDataSetPandas)
print("Serial Standard deviation time is:", time.clock()-start)

start = time.clock()
stdDataSetParallel = purchaseOfBlackFriday.std().compute()
print("Parallel standard deviation is:", stdDataSetParallel)
print("Parallel standard is:", time.clock()-start)

('Serial Standard deviation is:', 4981.017566722688)
('Serial sorting time is:', 0.6310730000000149)
('Parallel standard deviation is:', 4981.022132656476)
('Parallel sorting time is:', 0.05628200000001016)


In [44]:
#search 
x = 8027 
start = time.clock()
print("........Serial Implementation........")
result = binarySearch(np.array(purchaseData.sort_values()), 0, len(purchaseData)-1, x)
if result == -1:
    print("Element not present in the series")
else:
    print("Element not present at index", result)
    
print("Serial Search time using binary search  is:", time.clock()-start)

start = time.clock()
isFound, index = Sequential_Search(np.array(purchaseData.sort_values()), x)
print("Serial Search time using sequential search is:", time.clock()-start)

........Serial Implementation........
('Element not present at index', 263800)
('Serial Search time using binary search  is:', 0.1317109999999957)
('Serial Search time using sequential search is:', 0.30381199999999353)


In [42]:
startTime = time.clock()
x = 8027
arrOfData = np.array(purchaseData.sort_values())
pool = multiprocessing.Pool(processes=3)
isFound, index = pool.apply(sequentialSearch, (arrOfData, x))
pool.close()
print(".......Parallel Implementation.......")
print("parallel Search time is:", time.clock()-startTime)



.......Parallel Implementation.......
('parallel Search time is:', 0.1451980000000006)


In [54]:
#Groupby
startTime = time.clock()
data = pandasData.groupby('City_Category')
print("Serial time", time.clock()-startTime)

startTime = time.clock()
dataDask = daskData.groupby('City_Category')
print("Parallel time", time.clock()-startTime)

('Serial time', 0.0007970000000057098)
('Parallel time', 0.0007450000000090995)
