In [None]:
rdd = sc.textFile("/FileStore/tables/input.txt")

In [None]:
# Word count

In [None]:
word_counts = rdd.flatMap(lambda line: line.split(" ")) \
                 .map(lambda word: word.lower()) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda a, b: a + b)

In [None]:
for word, count in word_counts.take(20):
    print(f"{word}: {count}")

: 26001
of: 15419
united: 503
mary: 56
new: 772
york: 116
macmillan: 2
1921: 3
copyright,: 1
company.: 11
set: 116
electrotyped.: 1
published: 15
1921.: 3
norwood: 1
cushing: 1
co.--berwick: 1
smith: 5
co.: 1
preface: 2


In [None]:
# Specific Words 

In [None]:
specific_words = ["america", "president", "washington"]

specific_word_counts = word_counts.filter(lambda x: x[0] in specific_words)

swc = word_counts.filter(lambda x: x[0] in specific_words).collect()
for word, count in swc:
    print(f"{word}: {count}")


america: 131
washington: 92
president: 237


In [None]:
# Top 10 Words

In [None]:
top_10_words = (rdd.flatMap(lambda line: line.split())
                .map(lambda word: word.lower())
                .map(lambda word: (word, 1))
                .reduceByKey(lambda a, b: a + b)
                .sortBy(lambda pair: pair[1], ascending=False)
                )


In [None]:
print(top_10_words)

[('the', 28958), ('of', 15419), ('and', 11461), ('to', 9120), ('in', 8062), ('a', 6494), ('was', 3870), ('that', 2937), ('with', 2693), ('he', 2665)]


In [None]:
rdd = sc.textFile("/FileStore/tables/city_temperature.csv")

# Skip header
header = rdd.first()
rdd = rdd.filter(lambda row: row != header)


In [None]:
def parse_line(line):
    return [x.strip() for x in line.split(",")]

parsed_rdd = rdd.map(parse_line)


In [None]:
#AvgTemp by Region

In [None]:
region_avg_temp = parsed_rdd.map(lambda x: (x[0], (float(x[7]), 1))) \
                            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                            .mapValues(lambda v: v[0]/v[1]) \
                           

region_temp = parsed_rdd.map(lambda x: (x[0], (float(x[7]), 1))) \
                            .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                            .mapValues(lambda v: v[0]/v[1]) \
                            .collect()

for region, avg_temp in region_temp:
    print(f"{region}: {avg_temp}")


Asia: 62.56865184754641
Africa: 53.54951656193528
Australia/South Pacific: 61.180869127275976
Europe: 46.69628524306878
Middle East: 68.38455378399779
North America: 55.300932625245935
South/Central America & Carribean: 62.189438801074665


In [None]:
# AvgTemp by Year in Asian Countries

In [None]:
asia_avg_temp_by_year = (parsed_rdd.filter(lambda x: x[0] == "Asia")
                         .map(lambda x: ((x[6], x[1]), (float(x[7]), 1)))
                         .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
                         .mapValues(lambda v: v[0] / v[1])
                         .sortBy(lambda x: (x[0][0], x[0][1])))  # This line sorts by year and then by country

asia_avg_temp_year = asia_avg_temp_by_year.collect()

for (year, country), avg_temp in asia_avg_temp_year:
    print(f"{year}, {country}: {avg_temp}")

1995, Bangladesh: 42.21424657534248
1995, China: 59.772493150684916
1995, Hong Kong: 74.12
1995, India: 79.12102739726028
1995, Indonesia: 49.03205479452054
1995, Japan: 55.523652968036615
1995, Kazakhstan: 48.61095890410961
1995, Laos: 81.52958904109589
1995, Malaysia: 80.56465753424658
1995, Mongolia: 31.9356164383562
1995, Myanmar (Burma): -99.0
1995, Nepal: 22.466758241758246
1995, North Korea: 51.12876712328768
1995, Pakistan: 74.79246575342472
1995, Philippines: 80.56739726027399
1995, Singapore: 81.75616438356167
1995, South Korea: 51.75342465753425
1995, Sri Lanka: 77.38821917808224
1995, Taiwan: 73.02356164383565
1995, Tajikistan: -36.33616438356166
1995, Thailand: -99.0
1995, Turkmenistan: 63.47095890410957
1995, Uzbekistan: 59.08301369863013
1995, Vietnam: 74.38904109589043
1996, Bangladesh: 38.938524590163944
1996, China: 59.08896174863387
1996, Hong Kong: 74.97103825136611
1996, India: 79.10840163934427
1996, Indonesia: 47.78852459016394
1996, Japan: 54.4428961748633
1996,

In [None]:
# AvgTemp by Cities in Spain 

In [None]:
spain_avg_temp_by_city = parsed_rdd.filter(lambda x: x[1] == "Spain") \
                                  .map(lambda x: (x[3], (float(x[7]), 1))) \
                                  .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                                  .mapValues(lambda v: v[0]/v[1]) \


spain_avg_temp_city = parsed_rdd.filter(lambda x: x[1] == "Spain") \
                                  .map(lambda x: (x[3], (float(x[7]), 1))) \
                                  .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                                  .mapValues(lambda v: v[0]/v[1]) \
                                .collect()
                                  

for city, avg_temp in spain_avg_temp_city:
    print(f"{city}: {avg_temp}")


Barcelona: 61.26926397582565
Bilbao: 58.69515432764961
Madrid: 58.4405352903085


In [None]:
city_temp_rdd = sc.textFile("/FileStore/tables/city_temperature.csv")
country_list_rdd = sc.textFile("/FileStore/tables/country_list.csv")

header_city_temp = city_temp_rdd.first()
city_temp_rdd = city_temp_rdd.filter(lambda line: line != header_city_temp)

header_country_list = country_list_rdd.first()
country_list_rdd = country_list_rdd.filter(lambda line: line != header_country_list)



In [None]:
# Extract (City, (Country, AvgTemperature)) tuples from city_temperature.csv
city_temp_tuples = city_temp_rdd.map(lambda line: (line.split(",")[3], (line.split(",")[1], float(line.split(",")[7]))))

# Extract (Capital, Country) tuples from country-list.csv
country_capital_tuples = country_list_rdd.map(lambda line: (line.split(",")[1].replace("\"", "").strip(), line.split(",")[0].replace("\"", "").strip()))



In [None]:
joined_rdd = country_capital_tuples.join(city_temp_tuples)
final_rdd = joined_rdd.map(lambda x: (x[1][0], x[0], x[1][1][1]))


In [None]:
# AvgTemp in Capital for All Countries

In [None]:
rdd_for_avg = final_rdd.map(lambda x: (x[1], (x[2], 1)))
sum_counts_rdd = rdd_for_avg.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
avg_temp_rdd = sum_counts_rdd.map(lambda x: (x[0], x[1][0] / x[1][1]))
result_rdd = country_capital_tuples.join(avg_temp_rdd).map(lambda x: (x[1][0], x[0], x[1][1]))
print(result_rdd.take(10))



[('Armenia', 'Yerevan', 53.69699318040915), ('Bangladesh', 'Dhaka', 10.10987951807226), ('Barbados', 'Bridgetown', 77.00251697494743), ('Belgium', 'Brussels', 51.057047269587684), ('Croatia', 'Zagreb', 46.928613059902794), ('Finland', 'Helsinki', 42.24399956831414), ('Gambia', 'Banjul', 59.66582708528577), ('Ireland', 'Dublin', 49.06678178286205), ('Madagascar', 'Antananarivo', 63.445898985538584), ('Mozambique', 'Maputo', 62.78318631669572)]


In [None]:
spain_avg_temp_by_city.saveAsTextFile("dbfs:/FileStore/tables/spainavgtemp")
result_rdd.saveAsTextFile("dbfs:/FileStore/tables/results")