In [5]:
register_path = "/share/students/bigdata/Dati/Lab7/datiCompleti/register.csv"
stations_path = "/share/students/bigdata/Dati/Lab7/datiCompleti/stations.csv"

In [13]:
from pyspark import SparkConf, SparkContext
from datetime import datetime

In [43]:

def modify_timestamp(row):
    timestamp = row.split("\t")[1]
    datetimeObject = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
    dayOfTheWeek = datetimeObject.strftime("%a")
    hour = datetimeObject.hour
    timeslot_wanted = str(dayOfTheWeek) + "-" + str(hour)
    new_row = row.split("\t")[0] +"\t"+ timeslot_wanted +"\t"+ row.split("\t")[2] +"\t" +row.split("\t")[3]

    return new_row
    
def make_key_values(row):
    full_flag = 0
    if int(row.split("\t")[3]) == 0:
        full_flag = 1

    new_row = ((row.split("\t")[0], row.split("\t")[1]) , (1, full_flag))
    return new_row

def find_criticality(row):
    values = row[1]
    return (row[0] , values[1] / values[0])

def modify_for_most_critical(row):
    key = row[0]
    id_station = key[0]
    return (id_station , (key[1], row[1]))

def reduce_by_criticality(v1, v2):
    timeslot1, criticality1 = v1
    timeslot2, criticality2 = v2

    if criticality1>criticality2:
        return v1
    if criticality2>criticality1:
        return v2

    if timeslot1.split("-")[1] < timeslot2.split("-")[1]:
        return v1
    elif timeslot2.split("-")[1] < timeslot1.split("-")[1]:
        return v2

    if timeslot1.split("-")[0] < timeslot2.split("-")[0]:
        return v1
    elif timeslot2.split("-")[0] < timeslot1.split("-")[0]:
        return v2

def main(criticality_threshold = 0):
    conf = SparkConf().setAppName("lab 7")
    ctx = SparkContext.getOrCreate(conf)

    register_RDD = ctx.textFile(register_path)
    stations_RDD = ctx.textFile(stations_path)

    # RDD without header
    register_RDD = register_RDD.filter(lambda row: row.split("\t")[0] != 'station')
    stations_RDD = stations_RDD.filter(lambda row: row.split("\t")[0] != 'id')

    # Clean the register 
    register_RDD = register_RDD.filter(lambda row: int(row.split("\t")[2]) != 0 or int(row.split("\t")[3]) != 0)

    # Modify the timestamp
    register_RDD = register_RDD.map(modify_timestamp)
    # Make key-value pairs
    key_value_RDD  =register_RDD.map(make_key_values)

    combined_register_RDD = key_value_RDD.combineByKey(lambda x : x, lambda c,x : (c[0] + x[0], c[1] + x[1]), lambda c1, c2: (c1[0] + c2[0], c1[1] + c2[1]) )

    # Find the criticallity for each key
    criticality_RDD = combined_register_RDD.map(find_criticality)

    if (criticality_threshold != 0):
        criticality_RDD = criticality_RDD.filter(lambda row: row[1] > criticality_threshold)

    # Find the most critical timeslot for each station
    modified_criticality_RDD  =criticality_RDD.map(modify_for_most_critical)

    biggest_criticality_RDD = modified_criticality_RDD.reduceByKey(reduce_by_criticality)
    
    print(register_RDD.take(10))
    print(biggest_criticality_RDD.take(10))

if __name__ == "__main__":
    main()
    


['1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18', '1\tThu-12\t0\t18']




[('7', ('Fri-22', 0.37416107382550334)), ('80', ('Thu-10', 0.11837455830388692)), ('114', ('Mon-0', 0.39166666666666666)), ('130', ('Sun-5', 0.21441124780316345)), ('131', ('Mon-0', 0.25555555555555554)), ('136', ('Sat-5', 0.28295254833040423)), ('219', ('Fri-3', 0.08797653958944282)), ('250', ('Sat-10', 0.10350877192982456)), ('8', ('Sun-0', 0.41225626740947074)), ('20', ('Tue-1', 0.38509316770186336))]


                                                                                