In [13]:
# Import required libraries
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

# Check if SparkContext already exists and use it (or create a new one if needed)
try:
    # Try to use the existing SparkContext
    sc
    print("Using existing SparkContext")
except NameError:
    # Initialize Spark if no SparkContext exists
    conf = SparkConf().setAppName("WorldCitiesPop").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print("Created new SparkContext")

# Exercise 1: Mise en route
print("\n===== Exercise 1: Mise en route =====")

# Check if the worldcitiespop.txt file exists
worldcities_path = "/content/sample_data/worldcitiespop.txt"

if not os.path.exists(worldcities_path):
    print(f"File not found: {worldcities_path}")
    print("Please make sure the file is in the correct location.")
else:
    # Display the first few lines using terminal command
    !head -n 5 {worldcities_path}

    # Read the file into an RDD
    rddCities = sc.textFile(worldcities_path)
    print("\nFirst line of the file:")
    print(rddCities.first())

    # Exercise 2: Nettoyage simple worldcitiespop
    print("\n===== Exercise 2: Nettoyage simple worldcitiespop =====")

    # Clean the data - keep only lines with population data
    # The population is the 5th field (index 4) when splitting by comma
    header = rddCities.first()  # Save the header line

    # Filter to keep only lines with population data
    validCities = rddCities.filter(lambda line: line != header and
                                  len(line.split(",")) > 4 and
                                  line.split(",")[4].strip() != "" and
                                  line.split(",")[4].strip().isdigit())

    # Count the number of valid cities
    validCount = validCities.count()
    print(f"Number of cities with population data: {validCount}")

    # Display some examples of valid cities
    print("Examples of valid cities:")
    for city in validCities.take(5):
        print(city)

    # Write the cleaned data to a new file
    # First define the output file path
    cleaned_file_path = "/content/cleaned_worldcitiespop.txt"

    # Save as a single file (good for smaller datasets)
    validCities.coalesce(1).saveAsTextFile("/content/cleaned_data")

    print(f"\nCleaned data has been saved to: /content/cleaned_data/part-00000")
    print("You can access it in the file browser on the left sidebar of Colab.")

    # Optional: Show the first few lines of the cleaned file
    print("\nFirst few lines of the cleaned data:")
    !head -n 5 /content/cleaned_data/part-00000

Using existing SparkContext

===== Exercise 1: Mise en route =====
Country,City,AccentCity,Region,Population,Latitude,Longitude
ad,aixas,Aix�s,06,,42.4833333,1.4666667
ad,aixirivali,Aixirivali,06,,42.4666667,1.5
ad,aixirivall,Aixirivall,06,,42.4666667,1.5
ad,aixirvall,Aixirvall,06,,42.4666667,1.5

First line of the file:
Country,City,AccentCity,Region,Population,Latitude,Longitude

===== Exercise 2: Nettoyage simple worldcitiespop =====
Number of cities with population data: 47980
Examples of valid cities:
ad,andorra la vella,Andorra la Vella,07,20430,42.5,1.5166667
ad,canillo,Canillo,02,3292,42.5666667,1.6
ad,encamp,Encamp,03,11224,42.5333333,1.5833333
ad,la massana,La Massana,04,7211,42.55,1.5166667
ad,les escaldes,Les Escaldes,08,15854,42.5,1.5333333

Cleaned data has been saved to: /content/cleaned_data/part-00000
You can access it in the file browser on the left sidebar of Colab.

First few lines of the cleaned data:
ad,andorra la vella,Andorra la Vella,07,20430,42.5,1.5166667
ad,

In [15]:
# Exercise 3: Statistics on City Populations

# Import required libraries
from pyspark import SparkContext, SparkConf
import numpy as np
import math
import os
import shutil  # For removing directories

# Check if SparkContext already exists and use it
try:
    sc
    print("Using existing SparkContext")
except NameError:
    # Initialize Spark if no SparkContext exists
    conf = SparkConf().setAppName("CityPopulationStats").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print("Created new SparkContext")

# Path to the data file
worldcities_path = "/content/sample_data/worldcitiespop.txt"

# Read the file into an RDD
rddCities = sc.textFile(worldcities_path)

# Get the header
header = rddCities.first()

# Filter to keep only lines with valid population data
validCities = rddCities.filter(lambda line:
                              line != header and
                              len(line.split(",")) > 4 and
                              line.split(",")[4].strip() != "" and
                              line.split(",")[4].strip().isdigit())

# Extract the population values as integers
populations = validCities.map(lambda line: int(line.split(",")[4]))

# Calculate statistics
count = populations.count()
min_pop = populations.min()
max_pop = populations.max()
sum_pop = populations.sum()
mean_pop = sum_pop / count
variance = populations.map(lambda x: (x - mean_pop) ** 2).sum() / count
stdev = math.sqrt(variance)

# Print the statistics
print("\n===== Population Statistics =====")
print(f"Count: {count}")
print(f"Minimum Population: {min_pop}")
print(f"Maximum Population: {max_pop}")
print(f"Sum of Populations: {sum_pop}")
print(f"Average Population: {mean_pop:.2f}")
print(f"Standard Deviation: {stdev:.2f}")

# Format as in the expected output
print(f"(count: {count}, mean: {mean_pop}, stdev: {stdev}, max: {float(max_pop)}, min: {float(min_pop)})")

# Define the output directory
output_dir = "/content/cleaned_data_ex3"

# Remove the directory if it already exists
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)  # This will delete the directory and all its contents

# Save the clean data
validCities.coalesce(1).saveAsTextFile(output_dir)
print(f"\nCleaned data saved to: {output_dir}/part-00000")

Using existing SparkContext

===== Population Statistics =====
Count: 47980
Minimum Population: 7
Maximum Population: 31480498
Sum of Populations: 2289584999
Average Population: 47719.57
Standard Deviation: 302885.56
(count: 47980, mean: 47719.57063359733, stdev: 302885.5592040371, max: 31480498.0, min: 7.0)

Cleaned data saved to: /content/cleaned_data_ex3/part-00000


In [16]:
# Exercise 4: Population Histograms

# Import required libraries
from pyspark import SparkContext, SparkConf
import numpy as np
import math
import os
import shutil  # For removing directories

# Check if SparkContext already exists and use it
try:
    sc
    print("Using existing SparkContext")
except NameError:
    # Initialize Spark if no SparkContext exists
    conf = SparkConf().setAppName("CityPopulationHistogram").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print("Created new SparkContext")

# Path to the data file
worldcities_path = "/content/sample_data/worldcitiespop.txt"

# Read the file into an RDD
rddCities = sc.textFile(worldcities_path)

# Get the header
header = rddCities.first()

# Filter to keep only lines with valid population data
validCities = rddCities.filter(lambda line:
                              line != header and
                              len(line.split(",")) > 4 and
                              line.split(",")[4].strip() != "" and
                              line.split(",")[4].strip().isdigit())

# Extract the population values as integers
populations = validCities.map(lambda line: int(line.split(",")[4]))

# Calculate statistics (same as Exercise 3)
count = populations.count()
min_pop = populations.min()
max_pop = populations.max()
sum_pop = populations.sum()
mean_pop = sum_pop / count
variance = populations.map(lambda x: (x - mean_pop) ** 2).sum() / count
stdev = math.sqrt(variance)

# Print the statistics
print("\n===== Population Statistics =====")
print(f"(count: {count}, mean: {mean_pop}, stdev: {stdev}, max: {float(max_pop)}, min: {float(min_pop)})")

# Calculate the histogram with logarithmic scale
# Class 0: [0-10[, Class 1: [10-100[, Class 2: [100-1000[, etc.
def get_log_class(population):
    if population == 0:
        return 0
    return int(math.log10(population))

# Map each population to its logarithmic class
histogram_data = populations.map(lambda pop: (get_log_class(pop), 1))

# Count the number of cities in each class
histogram = histogram_data.reduceByKey(lambda a, b: a + b)

# Sort by class
sorted_histogram = histogram.sortByKey().collect()

# Print the histogram
print("\n===== Population Histogram (Logarithmic Scale) =====")
print("Class\tRange\t\tCount")
for class_id, count in sorted_histogram:
    range_start = 10 ** class_id if class_id > 0 else 0
    range_end = 10 ** (class_id + 1) if class_id >= 0 else 10
    print(f"{class_id}\t[{range_start}-{range_end}[\t{count}")

# Format as in the expected output
print(sorted_histogram)

# Define the output directory
output_dir = "/content/cleaned_data_ex4"

# Remove the directory if it already exists
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

# Save the clean data
validCities.coalesce(1).saveAsTextFile(output_dir)
print(f"\nCleaned data saved to: {output_dir}/part-00000")

Using existing SparkContext

===== Population Statistics =====
(count: 47980, mean: 47719.57063359733, stdev: 302885.5592040371, max: 31480498.0, min: 7.0)

===== Population Histogram (Logarithmic Scale) =====
Class	Range		Count
0	[0-10[	5
1	[10-100[	174
2	[100-1000[	2187
3	[1000-10000[	20537
4	[10000-100000[	21550
5	[100000-1000000[	3248
6	[1000000-10000000[	269
7	[10000000-100000000[	10
[(0, 5), (1, 174), (2, 2187), (3, 20537), (4, 21550), (5, 3248), (6, 269), (7, 10)]

Cleaned data saved to: /content/cleaned_data_ex4/part-00000


In [17]:
# Exercise 5: Top K Cities by Population

# Import required libraries
from pyspark import SparkContext, SparkConf
import math
import os
import shutil  # For removing directories

# Check if SparkContext already exists and use it
try:
    sc
    print("Using existing SparkContext")
except NameError:
    # Initialize Spark if no SparkContext exists
    conf = SparkConf().setAppName("TopCitiesByPopulation").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print("Created new SparkContext")

# Path to the data file
worldcities_path = "/content/sample_data/worldcitiespop.txt"

# Read the file into an RDD
rddCities = sc.textFile(worldcities_path)

# Get the header
header = rddCities.first()

# Filter to keep only lines with valid population data
validCities = rddCities.filter(lambda line:
                              line != header and
                              len(line.split(",")) > 4 and
                              line.split(",")[4].strip() != "" and
                              line.split(",")[4].strip().isdigit())

# Extract the population values as integers along with the full line
city_with_pop = validCities.map(lambda line: (int(line.split(",")[4]), line))

# Calculate statistics for populations
populations = city_with_pop.map(lambda x: x[0])
count = populations.count()
min_pop = populations.min()
max_pop = populations.max()
sum_pop = populations.sum()
mean_pop = sum_pop / count
variance = populations.map(lambda x: (x - mean_pop) ** 2).sum() / count
stdev = math.sqrt(variance)

# Print the statistics
print("\n===== Population Statistics =====")
print(f"(count: {count}, mean: {mean_pop}, stdev: {stdev}, max: {float(max_pop)}, min: {float(min_pop)})")

# Calculate the histogram with logarithmic scale
def get_log_class(population):
    if population == 0:
        return 0
    return int(math.log10(population))

histogram_data = populations.map(lambda pop: (get_log_class(pop), 1))
histogram = histogram_data.reduceByKey(lambda a, b: a + b)
sorted_histogram = histogram.sortByKey().collect()
print(sorted_histogram)

# Get the top 10 cities by population
top_cities = city_with_pop.sortByKey(ascending=False).take(10)

# Print the top 10 cities
print("\n===== Top 10 Cities by Population =====")
for i, (pop, city_line) in enumerate(top_cities, 1):
    fields = city_line.split(",")
    country_code = fields[0]
    city_name = fields[1]
    accent_city = fields[2]
    region = fields[3]
    population = fields[4]
    latitude = fields[5] if len(fields) > 5 else "N/A"
    longitude = fields[6] if len(fields) > 6 else "N/A"

    print(f"{i}. {country_code},{city_name},{accent_city},{region},{population},{latitude},{longitude}")

# Define the output directory
output_dir = "/content/cleaned_data_ex5"

# Remove the directory if it already exists
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

# Save the clean data
validCities.coalesce(1).saveAsTextFile(output_dir)
print(f"\nCleaned data saved to: {output_dir}/part-00000")

Using existing SparkContext

===== Population Statistics =====
(count: 47980, mean: 47719.57063359733, stdev: 302885.5592040371, max: 31480498.0, min: 7.0)
[(0, 5), (1, 174), (2, 2187), (3, 20537), (4, 21550), (5, 3248), (6, 269), (7, 10)]

===== Top 10 Cities by Population =====
1. jp,tokyo,Tokyo,40,31480498,35.685,139.751389
2. cn,shanghai,Shanghai,23,14608512,31.045556,121.399722
3. in,bombay,Bombay,16,12692717,18.975,72.825833
4. pk,karachi,Karachi,05,11627378,24.9056,67.0822
5. in,delhi,Delhi,07,10928270,28.666667,77.216667
6. in,new delhi,New Delhi,07,10928270,28.6,77.2
7. ph,manila,Manila,D9,10443877,14.6042,120.9822
8. ru,moscow,Moscow,48,10381288,55.752222,37.615556
9. kr,seoul,Seoul,11,10323448,37.5985,126.9783
10. br,sao paulo,S�o Paulo,27,10021437,-23.473293,-46.665803

Cleaned data saved to: /content/cleaned_data_ex5/part-00000


In [18]:
# Exercise 6: Re-cleaning - Removing Duplicates

# Import required libraries
from pyspark import SparkContext, SparkConf
import math
import os
import shutil  # For removing directories

# Check if SparkContext already exists and use it
try:
    sc
    print("Using existing SparkContext")
except NameError:
    # Initialize Spark if no SparkContext exists
    conf = SparkConf().setAppName("CitiesDeduplication").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print("Created new SparkContext")

# Path to the data file
worldcities_path = "/content/sample_data/worldcitiespop.txt"

# Read the file into an RDD
rddCities = sc.textFile(worldcities_path)

# Get the header
header = rddCities.first()

# Filter to keep only lines with valid population data
validCities = rddCities.filter(lambda line:
                              line != header and
                              len(line.split(",")) > 4 and
                              line.split(",")[4].strip() != "" and
                              line.split(",")[4].strip().isdigit())

# Function to determine if two locations are close enough to be considered the same city
# This function creates a proximity-based key for cities
def create_location_key(fields):
    try:
        # Use country code and city name as the primary key components
        country = fields[0]
        city_name = fields[1].lower()  # Normalize city name to lowercase

        # If we have coordinates, add them (rounded) to make the key more specific
        if len(fields) >= 7 and fields[5] and fields[6]:
            # Round coordinates to 1 decimal place (approx 11km resolution)
            try:
                lat = round(float(fields[5]), 1)
                lon = round(float(fields[6]), 1)
                return f"{country}_{city_name}_{lat}_{lon}"
            except ValueError:
                pass

        # If no valid coordinates, use country and city name
        return f"{country}_{city_name}"
    except:
        # Fallback in case of any parsing errors
        return fields[0] + "_" + fields[1] if len(fields) > 1 else fields[0]

# Parse cities and create a key based on location
parsed_cities = validCities.map(lambda line:
                               (create_location_key(line.split(",")),
                                (int(line.split(",")[4]), line)))

# Group by location and take the city with the highest population for each location
deduplicated_cities = parsed_cities.reduceByKey(lambda a, b: a if a[0] >= b[0] else b)

# Extract the city information (without the location key)
clean_cities = deduplicated_cities.map(lambda x: x[1][1])

# Extract the population values for statistics
populations = clean_cities.map(lambda line: int(line.split(",")[4]))

# Calculate statistics for populations
count = populations.count()
min_pop = populations.min()
max_pop = populations.max()
sum_pop = populations.sum()
mean_pop = sum_pop / count
variance = populations.map(lambda x: (x - mean_pop) ** 2).sum() / count
stdev = math.sqrt(variance)

# Print the statistics (after deduplication)
print("\n===== Population Statistics (After Deduplication) =====")
print(f"(count: {count}, mean: {mean_pop}, stdev: {stdev}, max: {float(max_pop)}, min: {float(min_pop)})")

# Calculate the histogram with logarithmic scale
def get_log_class(population):
    if population == 0:
        return 0
    return int(math.log10(population))

histogram_data = populations.map(lambda pop: (get_log_class(pop), 1))
histogram = histogram_data.reduceByKey(lambda a, b: a + b)
sorted_histogram = histogram.sortByKey().collect()
print(sorted_histogram)

# Get the top 20 cities by population (to match expected output)
city_with_pop = clean_cities.map(lambda line: (int(line.split(",")[4]), line))
top_cities = city_with_pop.sortByKey(ascending=False).take(20)

# Print the top 20 cities in the format from the expected output
print("\n===== Top 20 Cities by Population (After Deduplication) =====")
for pop, city_line in top_cities:
    fields = city_line.split(",")
    if len(fields) >= 7:
        country_code = fields[0]
        city_name = fields[1]
        accent_city = fields[2]
        region = fields[3]
        population = fields[4]
        latitude = fields[5]
        longitude = fields[6]

        print(f"{country_code},{city_name},{accent_city},{region},{population},{latitude},{longitude}")

# Define the output directory
output_dir = "/content/deduplicated_data_ex6"

# Remove the directory if it already exists
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

# Save the deduplicated data
clean_cities.coalesce(1).saveAsTextFile(output_dir)
print(f"\nDeduplicated data saved to: {output_dir}/part-00000")

Using existing SparkContext

===== Population Statistics (After Deduplication) =====
(count: 47932, mean: 47706.636923141115, stdev: 302954.5879980082, max: 31480498.0, min: 7.0)
[(0, 5), (1, 174), (2, 2186), (3, 20506), (4, 21540), (5, 3243), (6, 268), (7, 10)]

===== Top 20 Cities by Population (After Deduplication) =====
jp,tokyo,Tokyo,40,31480498,35.685,139.751389
cn,shanghai,Shanghai,23,14608512,31.045556,121.399722
in,bombay,Bombay,16,12692717,18.975,72.825833
pk,karachi,Karachi,05,11627378,24.9056,67.0822
in,delhi,Delhi,07,10928270,28.666667,77.216667
in,new delhi,New Delhi,07,10928270,28.6,77.2
ph,manila,Manila,D9,10443877,14.6042,120.9822
ru,moscow,Moscow,48,10381288,55.752222,37.615556
kr,seoul,Seoul,11,10323448,37.5985,126.9783
br,sao paulo,S�o Paulo,27,10021437,-23.473293,-46.665803
tr,istanbul,Istanbul,34,9797536,41.018611,28.964722
ng,lagos,Lagos,05,8789133,6.453056,3.395833
mx,mexico,Mexico,09,8720916,19.434167,-99.138611
id,jakarta,Jakarta,04,8540306,-6.174444,106.82944