In [10]:
import os
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession.builder.appName('solution_ex_13').getOrCreate()

In [12]:
inputPath = './data'
in_file_neigh = inputPath + '/neighbors.txt'
reads_file = inputPath + '/readings.txt'

outputPath_1 = './output_ex_13_reads'
outputPath_2 = './output_ex_13_neigh'

print(os.path.exists(inputPath))
print(os.path.exists(outputPath_1))
print(os.path.exists(outputPath_2))

True
False
False


In [13]:
readRDD = spark.sparkContext.textFile(reads_file)
treshFreeSlots = 3
treshCritPerc = 0.8

In [14]:
def criticalSituation(line):
    fields = line.split(",")
    # fields[0] -> station id
    # fields[5] -> free slots
    stationId = fields[0]
    numFreeSlots = int(fields[5])
    
    if  numFreeSlots < treshFreeSlots:
        return (stationId, (1, 1))
    else:
        return (stationId, (1, 0))

In [15]:
# for each read/line map them to a read - situation rdd
stationReads = readRDD.map(criticalSituation)

# for each station compute the total of reads and the total of critical situations
stationCounts = stationReads.reduceByKey(lambda c1, c2: (c1[0]+c2[0], c1[1]+c2[1]) )

# compute percentages for station of critSituations
stationCritSituation = stationCounts.mapValues(lambda counters: counters[1]/counters[0])

# stations sorted
selectedStationsSorted = stationCritSituation.sortBy(lambda sensorPerc: sensorPerc[1], ascending=False)

selectedStationsSorted.collect()

[('s4', 1.0), ('s3', 0.4), ('s2', 0.25), ('s1', 0.2), ('s5', 0.2)]

In [16]:
selectedStationsSorted.saveAsTextFile(outputPath_1)

                                                                                

In [17]:
neighborsRDD = spark.sparkContext.textFile(in_file_neigh)

# Map each line of the input file to a pair stationid, list of neighbor stations
nPairRDD = neighborsRDD.map(lambda line: (line.split(",")[0], line.split(",")[1].split(" ")) )

# Create a local dictionary in the main memory of the driver that will be used to store the mapping 
# stationid -> list of neighbors
# There are only 100 stations. Hence, you can suppose that data about neighbors can be stored in the main memory
neighbors=nPairRDD.collectAsMap()

# Select the lines/readings associated with a full status (number of free slots equal to 0)
fullStatusLines = readRDD.filter(lambda line: int(line.split(",")[5])==0)

In [18]:
def extractTimestamp(reading):
    fields = reading.split(",")
    timestamp = fields[1] + fields[2] + fields[3]
    
    return timestamp

In [19]:
# Create an RDD of pairs with key = timestamp and value=reading associated with that timestamp
# The concatenation of fields[1], fields[2], fields[3] is the timestamp of the reading
fullLinesPRDD = fullStatusLines.map(lambda reading: (extractTimestamp(reading), reading))

#  Collapse all the values with the same key in one single pair (timestamp, reading associated with that timestamp)
fullReadingsPerTimestamp = fullLinesPRDD.groupByKey()

In [20]:
def selectReadingssFunc(pairTimeStampListReadings):
    # Extract the list of stations that appear in the readings
    # associated with the current key 
    # (i.e., the list of stations that are full in this timestamp)
    # The list of readings is in the value part of the inpput key-value pair
    stations = []
    for reading in pairTimeStampListReadings[1]:
        # Extract the stationid from each reading
        fields = reading.split(",")
        stationId = fields[0]
        stations.append(stationId)
        
        
    # Iterate again over the list of readings to select the readings satistying the constraint on the 
    # full status situation of all neighboors 
    selectedReading = []

    for reading in pairTimeStampListReadings[1]:
        # This reading must be selected if all the neighbors of
        # the station of this reading are also in the value of
        # the current key-value pair (i.e., if they are in list stations)
        # Extract the stationid of this reading
        fields = reading.split(",")
        stationId = fields[0]

        # Select the list of neighbors of the current station
        nCurrentStation = neighbors[stationId]
        
        # Check if all the neighbors of the current station are in value 
        # (i.e., the local list stations) of the current key-value pair
        allNeighborsFull = True
        
        for neighborStation in nCurrentStation:
            if neighborStation not in stations:
                # There is at least one neighbor of th current station
                # that is not in the full status in this timestamp
                allNeighborsFull = False
                
        if allNeighborsFull == True:
            selectedReading.append(reading)
            
    return selectedReading

In [21]:
# Each pair contains a timestamp and the list of readings (with number of free slots equal to 0) 
# associated with that timestamp.
# Check, for each reading in the list, if all the neighbors of the station of that reading are 
# also present in this list of readings
# Emit one "string" for each reading associated with a completely full status 
selectedReadingsRDD = fullReadingsPerTimestamp.flatMap(selectReadingssFunc)

In [22]:
# Store the result in HDFS
selectedReadingsRDD.saveAsTextFile(outputPath_2)