In [2]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [3]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
    !tar xf spark-3.3.2-bin-hadoop3.tgz
    !mv spark-3.3.2-bin-hadoop3 spark
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"

In [4]:
import findspark
findspark.init()

In [5]:
spark_url = 'local'

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [7]:
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark SQL')\
        .getOrCreate()

In [1]:
!pip install geopy



In [9]:
from pyspark.sql.functions import col, sum as spark_sum, udf
from pyspark.sql.types import StructType, StructField, DoubleType
from geopy.geocoders import Nominatim
import ssl

# Disable SSL certificate verification (Not recommended for production use)
ssl._create_default_https_context = ssl._create_unverified_context

# Initialize geolocator
geolocator = Nominatim(user_agent="city_geocoder")

path = '2018'

# Read CSV data into a Spark DataFrame
df = spark.read.csv(f'{path}_city.csv', header=True, inferSchema=True)

# Group by 'city' and calculate the sum of 'author_count'
sorted_df = df.groupBy("city").agg(spark_sum("author_count").alias("sum")).orderBy(col("sum").desc())

# Define a UDF for geocoding
def geocode_city(city):
    try:
        location = geolocator.geocode(city, timeout=10000)
        if location:
            return {"latitude": location.latitude, "longitude": location.longitude}
        else:
            return {"latitude": None, "longitude": None}
    except Exception as e:
        # Log the error message and return None for both latitude and longitude
        print(f"Geocoding error for city {city}: {e}")
        return {"latitude": None, "longitude": None}

# Register the UDF
geocode_udf = udf(
    geocode_city,
    StructType([
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True)
    ])
)

# Apply the geocoding UDF to each city in the DataFrame
geocoded_df = sorted_df.withColumn("coordinates", geocode_udf(col("city")))

# Extract 'latitude' and 'longitude' from the 'coordinates' column
result_df = geocoded_df \
    .withColumn("latitude", col("coordinates.latitude")) \
    .withColumn("longitude", col("coordinates.longitude")) \
    .drop("coordinates")

# Show the resulting DataFrame
result_df.show(truncate=False)

# Save the resulting DataFrame to a CSV file
result_df.write.csv(f'City/{path}_city_sum_coordinate.csv', header=True, mode="overwrite")


+-------+-----+----------+-----------+
|city   |sum  |latitude  |longitude  |
+-------+-----+----------+-----------+
|Hamburg|15319|53.550341 |10.000654  |
|Geneva |15303|46.2017559|6.1466014  |
|Batavia|11466|-6.175247 |106.8270488|
|Moscow |8584 |55.625578 |37.6063916 |
|Bangkok|8466 |13.7524938|100.4935089|
|Zurich |8035 |47.3744489|8.5410422  |
|Aachen |6935 |50.776351 |6.083862   |
|London |6691 |51.5074456|-0.1277653 |
|Bologna|6288 |44.4938203|11.3426327 |
|Bari   |6195 |41.1257843|16.8620293 |
+-------+-----+----------+-----------+

