# Architectures For Big Data
## First Challenge

### Valerio Cislaghi (mat. 982330)
### Leonardo Menti (mat. 982296)

In [3]:
import pandas as pd
from operator import add
#from pyspark import SparkContext
#sc = SparkContext()

In [4]:
key = 'CompID'
value = 'Milliseconds'

In [6]:
path = './logDataset.csv'
rdd = sc.textFile(path)

header = rdd.first() 

# removing header (it could be better, because it makes a comparison with all the rows)
rdd = rdd.filter(lambda x: x != header) 

### MEDIAN

In [7]:
def getItemMedian(keys, key, value): 
    key_i = keys.index(key)
    value_i = keys.index(value)
    def getItemAux(x):
        row = x.split(',')
        return (row[key_i], [float(row[value_i])])        
    return getItemAux

def calcMean(row):
    key = row[0]
    values = row[1]
    median = values[int(len(values)/2)]
    return {"key": key, "median": median}

# we use getItemMedian to format the data
# we delete empty values (and keys)
# we concat elements with the same key
# we sort the set
# and finally we take a sample of 15 elements with a lazy approach

median = rdd\
    .map(getItemMedian(header.split(','), key, value))\
    .filter(lambda x: x[0] != '' and x[1][0] != '')\
    .reduceByKey(list.__add__)\
    .mapValues(sorted)\
    .map(calcMean)\
    .take(15)

median

[{'key': '076333', 'median': 611.0},
 {'key': '038137', 'median': 547.0},
 {'key': '031989', 'median': 552.0},
 {'key': '022692', 'median': 605.0},
 {'key': '009780', 'median': 128.0},
 {'key': '058073', 'median': 363.0},
 {'key': '072699', 'median': 373.0},
 {'key': '042031', 'median': 421.0},
 {'key': '065296', 'median': 827.0},
 {'key': '054531', 'median': 507.0},
 {'key': '089836', 'median': 659.0},
 {'key': '083436', 'median': 607.0},
 {'key': '050279', 'median': 455.0},
 {'key': '084215', 'median': 905.0},
 {'key': '026654', 'median': 747.0}]

### MEAN

In [8]:
def getItemMean(keys, key, value):
    key_i = keys.index(key)
    value_i = keys.index(value)
    def getItemAux(x):
        row = x.split(',')
        return (row[key_i], (float(row[value_i]), 1))        
    return getItemAux

def calcMean(row):
    key = row[0]
    tot = row[1][0]
    count = row[1][1]
    return {"key": key, "mean": tot/count}

# we use getItemMedian to format the data
# we delete empty values (and keys)
# we sum with a reduce function all the values like (x1, x2) (y1, y2) -> (x1 + y1, x2 + y2)
# we calculate the mean like (tot, count) -> tot/count
# and finally we take a sample of 15 elements with a lazy approach

mean = rdd\
    .map(getItemMean(header.split(','), key, value))\
    .filter(lambda x: x[0] != '' and x[1] != '')\
    .reduceByKey(lambda acc, it: (acc[0] + it[0], acc[1] + it[1]))\
    .map(calcMean)\
    .take(15)
    #.collect()

mean

[{'key': '076333', 'mean': 506.6},
 {'key': '038137', 'mean': 578.4},
 {'key': '031989', 'mean': 543.6},
 {'key': '022692', 'mean': 623.0},
 {'key': '009780', 'mean': 194.4},
 {'key': '058073', 'mean': 405.6},
 {'key': '072699', 'mean': 384.8},
 {'key': '042031', 'mean': 493.2},
 {'key': '065296', 'mean': 674.8},
 {'key': '054531', 'mean': 609.0},
 {'key': '089836', 'mean': 636.4},
 {'key': '083436', 'mean': 689.6},
 {'key': '050279', 'mean': 540.4},
 {'key': '084215', 'mean': 685.8},
 {'key': '026654', 'mean': 626.6}]

22/02/01 22:26:57 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 4063561 ms exceeds timeout 120000 ms
22/02/01 22:26:57 WARN SparkContext: Killing executors is not supported by current scheduler.


### TEST

In [None]:
# confronts the sample obtained with mean/median calculated with pandas to prove the correctness of the program

data = pd.read_csv('./logDataset.csv', dtype={key: str, value: float}) 
data = data[[key, value]]

# mean test
for row in mean:
    spark_mean = row['mean']
    pd_mean = data[data[key] == row['key']].groupby(key).mean()[value][0]
    if pd_mean != row['mean']: raise Exception(f"pandas mean is different from spark mean {pd_mean} != {spark_mean}")

# median test
for row in median:
    spark_median = row['median']
    pd_median = data[data[key] == row['key']].groupby(key).median()[value][0]
    if pd_median != row['median']: raise Exception(f"pandas median is different from spark median {pd_median} != {spark_median}")

print("**** CORRECT ****")

In [None]:
# mean
data.groupby(key).mean()

In [None]:
# median
data.groupby(key).median()