In [20]:
import pyspark

sc = pyspark.SparkContext.getOrCreate();
test_file = "file:///home/jovyan/work/sample/house_price.csv"

lines = sc.textFile(test_file)

def parse_line(line: str):
    city, price, count = line.split(",")
    return (int(price), int(count))

price_count = lines.map(parse_line)
print(price_count.collect())

sum_of_count = price_count.mapValues(lambda count: (count, 1)) \
        .reduceByKey(lambda a, b: (int(a[0]) + int(b[0]), int(a[1]) + int(b[1])))
print(sum_of_count.collect())
avg_by_count = sum_of_count.mapValues(lambda total_count: int(total_count[0]) / total_count[1])
results = avg_by_count.collect()
print(results)

[(10000, 3), (10000, 5), (40000, 7), (5000, 7), (4000, 2), (9000, 4), (5000, 7), (4000, 2), (8000, 9)]
[(10000, (8, 2)), (40000, (7, 1)), (5000, (14, 2)), (4000, (4, 2)), (9000, (4, 1)), (8000, (9, 1))]
[(10000, 4.0), (40000, 7.0), (5000, 7.0), (4000, 2.0), (9000, 4.0), (8000, 9.0)]


In [25]:
test_file = "file:///home/jovyan/work/sample/temperature.csv"
lines = sc.textFile(test_file)
lines.collect()[:5]

['"record_id","month","day","year","AverageTemperatureFahr","AverageTemperatureUncertaintyFahr","City","country_id","Country","Latitude","Longitude"',
 '474376,"01","01","1853",NA,NA,"Auckland","NEW","New Zealand","36.17S","175.03E"',
 '474377,"02","01","1853",NA,NA,"Auckland","NEW","New Zealand","36.17S","175.03E"',
 '474378,"03","01","1853",NA,NA,"Auckland","NEW","New Zealand","36.17S","175.03E"',
 '474379,"04","01","1853",NA,NA,"Auckland","NEW","New Zealand","36.17S","175.03E"']

In [None]:
header = lines.first()

In [28]:
def get_data(line, header):
    if line!= header:
        col = line.split(",")
        city = col[6].strip("\"")
        avg_temp_fahr = col[4]
        yield (city, avg_temp_fahr)

In [37]:
parsed_line = lines.flatMap(lambda line: get_data(line, header))
print(parsed_line.collect()[:5])

filtered_line = parsed_line.filter(lambda x: "NA" not in x[1])
print(filtered_line.collect()[:5])

[('Auckland', 'NA'), ('Auckland', 'NA'), ('Auckland', 'NA'), ('Auckland', 'NA'), ('Auckland', 'NA')]
[('Auckland', '51.9062'), ('Auckland', '52.3886'), ('Auckland', '52.853'), ('Auckland', '52.5776'), ('Auckland', '54.8726')]


In [47]:
min_temp = filtered_line.reduceByKey(lambda x, y: min(float(x), float(y)))
final_temp = min_temp.collect()
# print(final_temp)
sorted(final_temp, key = lambda x: x[1])

[('Kiev', 2.85619999999999),
 ('Uppsala', 6.0494),
 ('Warsaw', 6.8),
 ('Kherson', 7.0952),
 ('Lvov', 7.1726),
 ('Wroclaw', 9.167),
 ('NA', 12.4682),
 ('Stockholm', 13.3988),
 ('Odesa', 14.8838),
 ('Paris', 25.0232),
 ('Tokyo', 29.156),
 ('Tottori', 34.2518),
 ('Marseille', 39.3908),
 ('Johannesburg', 42.1772),
 ('Hamilton', 44.564),
 ('Auckland', 49.856),
 ('Cape Town', 49.9946),
 ('Canoas', 50.009),
 ('Brasília', 62.9744)]

In [45]:
for city, temp in final_temp:
    print(f"{city}:{temp}")

Auckland:49.856
NA:12.4682
Johannesburg:42.1772
Marseille:39.3908
Odesa:14.8838
Tottori:34.2518
Warsaw:6.8
Brasília:62.9744
Canoas:50.009
Cape Town:49.9946
Hamilton:44.564
Kherson:7.0952
Kiev:2.85619999999999
Lvov:7.1726
Paris:25.0232
Stockholm:13.3988
Tokyo:29.156
Uppsala:6.0494
Wroclaw:9.167
