In [None]:
# Install PySpark and Java (Colab needs this for Apache Spark)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark



In [None]:
# Import necessary libraries and start Spark session
from pyspark import SparkContext, SparkConf
from google.colab import drive
import re

In [None]:
# Set up SparkContext
conf = SparkConf().setAppName("Q2_Temperature").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [None]:
# Mount Google Drive to access your files
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Read the text file from Google Drive.
inp_city = sc.textFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/Dataset/city_temperature.csv")
print(inp_city.collect())  # To see the content of the file (optional)

inp_country = sc.textFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/Dataset/country-list.csv")
print(inp_country.collect())  # To see the content of the file (optional)

Buffered data was truncated after reaching the output size limit.

In [None]:
# Clean and Split City Temperature Data
#Each line has 8 columns: Region,Country,State,City,Month,Day,Year,AvgTemperature
header_city = inp_city.first()
city_data = (
    inp_city.filter(lambda l: l != header_city)
            .map(lambda l: l.split(","))
            .filter(lambda x: len(x) == 8 and x[7].strip() != "")
            .map(lambda x: (x[0].strip(), x[1].strip(), x[2].strip(),
                            x[3].strip(), int(x[4]), int(x[5]), int(x[6]),
                            float(x[7])))
)

In [None]:
#Clean and Split Country List Data
#Columns: country, capital, type

header_country = inp_country.first()
country_data = (
    inp_country.filter(lambda l: l != header_country)
               .map(lambda l: l.split(","))
               .filter(lambda x: len(x) == 3)
               .map(lambda x: (x[0].strip(), x[1].strip(), x[2].strip()))
)


# **Q2A. Find the average of AvgTemperature for each Region.**

In [18]:
region_avg = (
    city_data.map(lambda x: (x[0], (x[7], 1)))
             .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
             .mapValues(lambda v: round(v[0]/v[1], 2))
)
region_avg.coalesce(1).saveAsTextFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/q2A_output")


# **Q2B. Find the average of AvgTemperature by Month for countries only located in the “Asia” Region**

In [12]:
asia_avg = (
    city_data.filter(lambda x: x[0].lower() == "asia")
             .map(lambda x: (x[4], (x[7], 1)))   # key = Month
             .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
             .mapValues(lambda v: round(v[0]/v[1], 2))
)
asia_avg.coalesce(1).saveAsTextFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/q2B_output")


# **Q2C. Find the average of AvgTemperature by City only located in the Country “Germany”**

In [13]:
germany_city_avg = (
    city_data.filter(lambda x: x[1].lower() == "germany")
             .map(lambda x: (x[3], (x[7], 1)))   # key = City
             .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
             .mapValues(lambda v: round(v[0]/v[1], 2))
)
germany_city_avg.coalesce(1).saveAsTextFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/q2C_output")

# **Q2D. For each country, find the capital and average of AvgTemperature of that capital city. Your**
output file should contain: <Country><TAB><Year><TAB><Avg of AvgTemperature of the Country>

In [28]:
import csv, unicodedata, re, os, shutil

def parse_csv(parts):
    for line in parts:
        for row in csv.reader([line]):   # safe for quoted commas
            yield row

def norm(s):
    s = s or ""
    s = unicodedata.normalize("NFKD", s).encode("ascii","ignore").decode("ascii")
    s = re.sub(r"[^a-z\s]", " ", s.lower().strip())
    return re.sub(r"\s+", " ", s)

# Read as TEXT and drop headers
city_txt    = sc.textFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/Dataset/city_temperature.csv")
country_txt = sc.textFile("/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/Dataset/country-list.csv")
city_hdr, country_hdr = city_txt.first(), country_txt.first()
city_rows    = city_txt.filter(lambda l: l != city_hdr).mapPartitions(parse_csv)
country_rows = country_txt.filter(lambda l: l != country_hdr).mapPartitions(parse_csv)

# Keep valid rows -> RDD SHAPES
# city_data: (Region, Country, State, City, Month, Day, Year, Temp)
def isfloat(s):
    try: float(s); return True
    except: return False

city_data = (city_rows
             .filter(lambda x: len(x) >= 8 and x[6].strip().isdigit() and isfloat(x[7]))
             .map(lambda x: (x[0].strip(), x[1].strip(), x[2].strip(), x[3].strip(),
                             int(x[4]), int(x[5]), int(x[6]), float(x[7]))))

# country_data: (Country, Capital, Type)
country_data = (country_rows
                .filter(lambda x: len(x) >= 2)
                .map(lambda x: (x[0].strip(), x[1].strip(), (x[2].strip() if len(x)>2 else ""))))

print("rows city_data       :", city_data.count())
print("rows country_data    :", country_data.count())

# DIAG: check country-name intersection
city_countries    = city_data.map(lambda x: x[1].strip()).distinct()
list_countries    = country_data.map(lambda x: x[0].strip()).distinct()
print("distinct in city    :", city_countries.count())
print("distinct in list    :", list_countries.count())
print("intersection(count) :", city_countries.intersection(list_countries).count())

# Build normalized keys to JOIN BY CAPITAL/CITY NAME
# city key: normalized City;   value: (year, temp, country_orig)
# capital key: normalized Capital; value: country_orig
city_by_name = city_data.map(lambda r: (norm(r[3]), (r[6], r[7], r[1])))
cap_to_ctry  = country_data.map(lambda r: (norm(r[1]), r[0]))

# Quick DIAG: how many names overlap?
city_names   = city_by_name.keys().distinct()
capital_names= cap_to_ctry.keys().distinct()
name_overlap = city_names.intersection(capital_names).count()
print("capital/city name overlap (after norm):", name_overlap)

# Join by normalized city/capital
joined = city_by_name.join(cap_to_ctry)   # (cap_norm, ((year,temp,country), country_from_list))
jn_count = joined.count()
print("joined rows (by name):", jn_count)
if jn_count == 0:
    # Print a few examples from each side
    print("Example city names:", city_names.take(10))
    print("Example capital names:", capital_names.take(10))

# 6) Average by (Country, Year)
pairs = joined.map(lambda kv: ((kv[1][1], kv[1][0][0]), (kv[1][0][1], 1.0)))
avg   = (pairs
         .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
         .mapValues(lambda sc: round(sc[0]/sc[1], 2)))

print("sample avg:", avg.take(10))

# 7) Save ONE TSV (Country<TAB>Year<TAB>Avg of AvgTemperature of the Country)
outD = "/content/drive/MyDrive/ColabNotebooks/CS4371_HW3_Natthiya/q2D_output"
if os.path.exists(outD): shutil.rmtree(outD)
(avg.map(lambda kv: f"{kv[0][0]}\t{kv[0][1]}\t{kv[1]}").coalesce(1).saveAsTextFile(outD))


rows city_data       : 2906327
rows country_data    : 248
distinct in city    : 125
distinct in list    : 247
intersection(count) : 114
capital/city name overlap (after norm): 106
joined rows (by name): 931683
sample avg: [(('Ethiopia', 2000), 19.28), (('Ethiopia', 2010), 11.77), (('Ethiopia', 2011), 57.47), (('Ethiopia', 2013), 60.76), (('Malawi', 1998), -5.16), (('Malawi', 1999), -29.41), (('Malawi', 2002), -39.44), (('Malawi', 2003), -87.36), (('Malawi', 2004), -99.0), (('Kyrgyzstan', 1998), 21.77)]
