In [25]:
from pyspark import SparkContext

In [26]:
sc = SparkContext.getOrCreate()
sc

In [27]:
def parseLines(line):
    parsed = line.split(',')
    stationId = parsed[0]
    entryType = parsed[2]
    temperature = float(parsed[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return [stationId,entryType, temperature]

In [28]:
rdd = sc.textFile("../datasets/1800.csv")
rdd.take(10)

['ITE00100554,18000101,TMAX,-75,,,E,',
 'ITE00100554,18000101,TMIN,-148,,,E,',
 'GM000010962,18000101,PRCP,0,,,E,',
 'EZE00100082,18000101,TMAX,-86,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,',
 'ITE00100554,18000102,TMAX,-60,,I,E,',
 'ITE00100554,18000102,TMIN,-125,,,E,',
 'GM000010962,18000102,PRCP,0,,,E,',
 'EZE00100082,18000102,TMAX,-44,,,E,',
 'EZE00100082,18000102,TMIN,-130,,,E,']

In [29]:
parsedRdd = rdd.map(parseLines)
parsedRdd.take(10)

[['ITE00100554', 'TMAX', 18.5],
 ['ITE00100554', 'TMIN', 5.359999999999999],
 ['GM000010962', 'PRCP', 32.0],
 ['EZE00100082', 'TMAX', 16.52],
 ['EZE00100082', 'TMIN', 7.699999999999999],
 ['ITE00100554', 'TMAX', 21.2],
 ['ITE00100554', 'TMIN', 9.5],
 ['GM000010962', 'PRCP', 32.0],
 ['EZE00100082', 'TMAX', 24.08],
 ['EZE00100082', 'TMIN', 8.599999999999998]]

In [30]:
filteredRdd = parsedRdd.filter( lambda x : x[1] == 'TMIN')
filteredRdd.take(10)

[['ITE00100554', 'TMIN', 5.359999999999999],
 ['EZE00100082', 'TMIN', 7.699999999999999],
 ['ITE00100554', 'TMIN', 9.5],
 ['EZE00100082', 'TMIN', 8.599999999999998],
 ['ITE00100554', 'TMIN', 23.72],
 ['EZE00100082', 'TMIN', 18.86],
 ['ITE00100554', 'TMIN', 29.66],
 ['EZE00100082', 'TMIN', 18.68],
 ['ITE00100554', 'TMIN', 30.919999999999998],
 ['EZE00100082', 'TMIN', 21.56]]

In [31]:
toReduceRdd = filteredRdd.map(lambda x: [x[0],x[2]])
toReduceRdd.take(10)

[['ITE00100554', 5.359999999999999],
 ['EZE00100082', 7.699999999999999],
 ['ITE00100554', 9.5],
 ['EZE00100082', 8.599999999999998],
 ['ITE00100554', 23.72],
 ['EZE00100082', 18.86],
 ['ITE00100554', 29.66],
 ['EZE00100082', 18.68],
 ['ITE00100554', 30.919999999999998],
 ['EZE00100082', 21.56]]

In [32]:
reducedRdd = toReduceRdd.reduceByKey(lambda x,y: min(x,y))
reducedRdd.collect()

[('ITE00100554', 5.359999999999999), ('EZE00100082', 7.699999999999999)]

In [33]:
toShow = sorted(dict(reducedRdd.collect()).items())
toShow

[('EZE00100082', 7.699999999999999), ('ITE00100554', 5.359999999999999)]

In [34]:
for i in toShow:
    print("{} minimun temperature: {:.2f}".format(i[0],i[1]))

EZE00100082 minimun temperature: 7.70
ITE00100554 minimun temperature: 5.36


In [59]:
def findMinimum(rdd):
    processedMin = rdd.map(parseLines).filter(lambda x: x[1] == "TMIN").map(lambda x : [x[0],x[2]]).reduceByKey(lambda x,y: min(x,y))
    printValuesMin = sorted(dict(processedMin.collect()).items())
    for i in printValuesMin:
        print("{} minimun temperature: {:.2f}".format(i[0],i[1]))

In [60]:
def findMaximum(rdd):
    processedMax = rdd.map(parseLines).filter(lambda x: x[1] == "TMAX").map(lambda x : [x[0],x[2]]).reduceByKey(lambda x,y: max(x,y))
    printValuesMax = sorted(dict(processedMax.collect()).items())
    for i in printValuesMax:
        print("{} maximum temperature: {:.2f}".format(i[0],i[1]))

In [61]:
findMinimum(rdd)
print("")
findMaximum(rdd)

EZE00100082 minimun temperature: 7.70
ITE00100554 minimun temperature: 5.36

EZE00100082 maximum temperature: 90.14
ITE00100554 maximum temperature: 90.14
