# treeAggregate
Create a treeAggregate function that will computer maximum and minimum on an RDD in one pass.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext("local")
spark = SparkSession.builder.getOrCreate()

## Load the data

In [2]:
testFile= "data\\taxi-data-sorted-verysmall-header.csv"
df = spark.read.format('csv').options(header='true', inferSchema='true',  sep =",").load(testFile)
df.show(5, truncate=True)

+--------------------+--------------------+-------------------+-------------------+---------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+-------------------+-------------------+---------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|07290D3599E7A0D62...|E7750A37CAB07D0DF...|2013-01-01 00:00:00|2013-01-01 00:02:00|      120|         0.44|      -73.956528|      40.716976|        -73.96244|       40.715008|         CSH|        3.5|      0.5|    0.5|       0.0|   

# Custom treeAggregate functions
Create aggregate functions that will find the key that has the maximum value and the key that has the minimum value in an RDD in one pass.

In [3]:
# example of function input (x = ("k1",-10000,"k2",1000000) , y = ('07', 165))
def seqOp(x,y):
    # res = (max_key, max_val, min_key, min_val)
    res = [0,0,0,0]
    if x[1] > y[1]:
        # it the current x values are bigger than the y then keep them
        res[0] = x[0]
        res[1] = x[1]
    else:
        # y is bigger, so use y as a maximum
        res[0] = y[0]
        res[1] = y[1]
    if x[3] < y[1]:
        # if the current x values are smaller than the y then keep them
        res[2] = x[2]
        res[3] = x[3]
    else:
        # y is smaller, so use y as a minimum
        res[2] = y[0]
        res[3] = y[1]
    return res
  
# example of function input (x = ('14',217, '49',70 ) , y = ('13',220, '50',70 ))
def combOp(x,y):
    # res = (max_key, max_val, min_key, min_val)
    res = [0,0,0,0]
    if x[1] > y[1]:
        # if the data in the x is bigger, then use x as the result
        res[0] = x[0]
        res[1] = x[1]
    else:
        res[0] = y[0]
        res[1] = y[1]
    if x[3] < y[3]:
        # if the data in the x is smaller, then use x as the result
        res[2] = x[2]
        res[3] = x[3]
    else:
        res[2] = y[2]
        res[3] = y[3]
    return res

### Testing the aggregate function

In [4]:
print(seqOp(["k1",-1000000,"k2",1000000],("k3",5)))
print(seqOp(["k1",20,"k2",10],("k3",5)))
print(seqOp(["k1",20,"k2",10],("k3",25)))

['k3', 5, 'k3', 5]
['k1', 20, 'k3', 5]
['k3', 25, 'k2', 10]


In [5]:
print(combOp(["k1",20,"k2",10],["k3",25,"k4",15]))
print(combOp(["k1",20,"k2",10],["k3",25,"k4",5]))

['k3', 25, 'k2', 10]
['k3', 25, 'k4', 5]


### Create dataset for aggregation

In [6]:
""" Define the initial value for the aggregation, a tuple of (max_key, max_val, min_key, min_val)
    We put the big negative number for the max_val and big positive number for the min_val
    and some arbitrary keys for the max_key and min_key
"""
agg_zero_val = ["k1",-1000000,"k2",1000000]

In [7]:
# Find the taxi vehicle with maximum and taxi vehicle minimum number of trips
# The md5sum of the identifier of the taxi is used as a taxi vehicle identifier
r1 = df.groupBy('medallion').count().rdd.map(tuple)
r1.treeAggregate(agg_zero_val, seqOp, combOp)


['696321779D687411F2E5DF6991E9D474', 7, '6D9C2E4EAC8F6A5C7D3102177BC42C03', 1]

In [8]:
# Find the driver with maximum and driver with minimum number of trips
# The md5sum of the identifier for the taxi license (Driver ID) is used as a driver identifier
r2 = df.groupBy('hack_license').count().rdd.map(tuple)
r2.treeAggregate(agg_zero_val, seqOp, combOp)


['00B7691D86D96AEBD21DD9E138F90840', 10, 'D8B109DC861AA892745CC4CFF78D98E3', 1]

# Simulation of the Aggregate operation in Spark
We will take the data from RDD and use it as the input in the Spark Aggregate simulation.

The simulation will be done using the following steps:

- We will split the data into partitions, 
- For each partition, we will simulate what is happening on the worker nodes.
- At the master node, we will use all partition sub-aggregates to combine them in the final aggregate, which is the result of the aggregate function.

In [9]:
r = df.groupBy('hack_license').count().rdd.map(tuple)
r.take(10)

[('130328475AD7427AFDE50A846CA08B22', 1),
 ('D4F2AE0988ECB2E421AAC0C876483801', 3),
 ('DD97899ACAC51EF3188A659DB1F4EDBB', 5),
 ('A7C47E60941315A0E1B18190584F1B8F', 4),
 ('9911D66A4A796752DAA9929262692322', 3),
 ('88CB7A1006DB184386777ACF070430A9', 4),
 ('069B5562096AF76848A613F23073B4BA', 2),
 ('28A7C858D9231A3EC2C90820A26083DC', 2),
 ('A7EE9AEDB7325F55F14F2D2448170D56', 2),
 ('4B6EFCBC110DB539E9ECCD320DB55ADC', 3)]

In [10]:
data = r.collect()
print(len(data))

4628


### Partition the data
We will create partitions based on a predefined partition_size parameter.

In [11]:
# Partition the data
partition_size = 1000
# Create partitions with size partition_size
partitions = [data[i:i + partition_size] for i in range(0, len(data), partition_size)]
# Print the number of created partitions
print(len(partitions))

5


In [12]:
partitions

[[('130328475AD7427AFDE50A846CA08B22', 1),
  ('D4F2AE0988ECB2E421AAC0C876483801', 3),
  ('DD97899ACAC51EF3188A659DB1F4EDBB', 5),
  ('A7C47E60941315A0E1B18190584F1B8F', 4),
  ('9911D66A4A796752DAA9929262692322', 3),
  ('88CB7A1006DB184386777ACF070430A9', 4),
  ('069B5562096AF76848A613F23073B4BA', 2),
  ('28A7C858D9231A3EC2C90820A26083DC', 2),
  ('A7EE9AEDB7325F55F14F2D2448170D56', 2),
  ('4B6EFCBC110DB539E9ECCD320DB55ADC', 3),
  ('AA33AA1CACC8C26E767C349BD5863426', 4),
  ('921E1C7C91CDCF1C6EC9C1FCD1EDF19F', 3),
  ('616E057B1ACDB7B28078AAD52EA67548', 2),
  ('EFA12B2F45FC838E19795CA997E26FFA', 4),
  ('D0D1271283601BF84573498EF352369D', 3),
  ('138B0A7B7D3B898E4314A73E45BEA369', 3),
  ('0FBF11956EE14B253F7FEA8160C31CDB', 2),
  ('8821CA9250E34A8696440A553317B22F', 2),
  ('BE047851D97506885B99BDDFA7A13360', 2),
  ('02856AFC22881ABCADDD5284BADDEB8D', 2),
  ('3CAE3CD87E55FD4246D4E3EE4483ED45', 2),
  ('A6519EA2BD56AFB2BE217E085A8310C2', 3),
  ('B508465FAC4F54A40CFDBB2B69707F5A', 1),
  ('DDABC5C

### Worker nodes simulation
We are using a for loop to simulate each worker node.

The partition_rez list is used to store the result from each partition, and then this list is used by the master node to combine the partial aggregates.

In [13]:
agg_zero_val = ["k1",-1000000,"k2",1000000]
# The list for storing the partitions aggregates.
partitions_rez = []
for partition in partitions:
    # This code will be executed on each worker node
    part_rez = agg_zero_val # initialise the aggregate
    for element in partition:
        part_rez = seqOp(part_rez, element) # update the aggregate with each element
    # Store the final partition aggregate in the list (for our simulation)
    # In Spark, at this point, the part_rez will be sent to the master node
    partitions_rez.append(part_rez)

In [14]:
# Print the results from each partition
for i, rez in enumerate(partitions_rez):
    print(i, rez)

0 ['529E7364F15A734DFB3F44376DC78267', 6, '3E8783B29ABF15A4F5B475D928D2A7BD', 1]
1 ['14C2ED390669165F2D3B5CE427A91027', 7, '2FBF9DAD51F548A6F1CE63165CC30BA7', 1]
2 ['DCD6A3DA3488EF99AAC46FE0EF41449B', 5, 'A8B92DC57A3DF887FF41DA5EBA763A97', 1]
3 ['00B7691D86D96AEBD21DD9E138F90840', 10, '956BC98C042F35EB171FF7FB1A6721B9', 1]
4 ['35A32952035266841AED93F2EFCEEB9D', 6, 'D8B109DC861AA892745CC4CFF78D98E3', 1]


### Master node simulation

The code that is executed on the master node.

This code creates the final aggregate from all partitions aggregates.

In [15]:
# We start with the initial (zero) value
final_rez = partitions_rez[0]
# In the simulation, we use a loop to process all partitions' aggregates.
for part_rez in partitions_rez[1:]:
    # In Spark, the master node will receive aggregates from the worker nodes
    # and will execute the combOp() function
    final_rez = combOp(final_rez, part_rez)
print(final_rez)

['00B7691D86D96AEBD21DD9E138F90840', 10, 'D8B109DC861AA892745CC4CFF78D98E3', 1]
