In [3]:
import findspark
findspark.init()
import pyspark

sc = pyspark.SparkContext(master='spark://hd-master:7077',
                          appName='big_data')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/30 13:10:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt
import pandas as pd
from geopy.geocoders import Nominatim
import numpy as np
from pycountry_convert import country_alpha2_to_continent_code, country_name_to_country_alpha2
import folium
from folium.plugins import MarkerCluster
# import geopandas as gpd
# import seaborn as sns
# from fuzzywuzzy import process

print('Modules are imported.')

Modules are imported.


In [5]:
spark = SQLContext(sc)



In [6]:
covid_data = spark.read.option("header", True).csv("hdfs://hd-master:9000/covid-19", inferSchema=True)
covid_data.show(5, truncate=False)

                                                                                

+-----------+----------+---------+---------+------+
|country    |date      |confirmed|recovered|deaths|
+-----------+----------+---------+---------+------+
|afghanistan|2020-01-22|0        |0        |0     |
|afghanistan|2020-01-23|0        |0        |0     |
|afghanistan|2020-01-24|0        |0        |0     |
|afghanistan|2020-01-25|0        |0        |0     |
|afghanistan|2020-01-26|0        |0        |0     |
+-----------+----------+---------+---------+------+
only showing top 5 rows



In [7]:
# Perform necessary data transformations
covid_data = covid_data.withColumn("year", F.year("date"))
covid_data = covid_data.withColumn("month", F.month("date"))

# Filter data for the year 2021
covid_data_2021 = covid_data.filter(covid_data.year == 2021)

# Calculate total confirmed cases by country and month for the year 2021
total_confirmed_2021_monthly = covid_data_2021.groupBy("country", "month").agg(F.sum("confirmed").alias("total_confirmed_2021"))

# Order the result by country and month
total_confirmed_2021_monthly = total_confirmed_2021_monthly.orderBy("country", "month")

# Filter data for the year 2020
covid_data_2020 = covid_data.filter(covid_data.year == 2020)

# Calculate total confirmed cases by country and month for the year 2020
total_confirmed_2020_monthly = covid_data_2020.groupBy("country", "month").agg(F.sum("confirmed").alias("total_confirmed_2020"))

# Order the result by country and month
total_confirmed_2020_monthly = total_confirmed_2020_monthly.orderBy("country", "month")

# Filter data for the year 2022
covid_data_2022 = covid_data.filter(covid_data.year == 2022)

# Calculate total confirmed cases by country and month for the year 2022
total_confirmed_2022_monthly = covid_data_2022.groupBy("country", "month").agg(F.sum("confirmed").alias("total_confirmed_2022"))

# Order the result by country and month
total_confirmed_2022_monthly = total_confirmed_2022_monthly.orderBy("country", "month")

# Show the result
# total_confirmed_2021_monthly.show()

# Combine the data for the years 2020, 2021, and 2022
total_confirmed_all_years = (
    total_confirmed_2020_monthly
    .join(total_confirmed_2021_monthly, ["country", "month"], "outer")
    .join(total_confirmed_2022_monthly, ["country", "month"], "outer")
)

# Fill missing values with 0
total_confirmed_all_years = total_confirmed_all_years.fillna(0)

# Calculate total confirmed cases by country
total_confirmed_all_years = (
    total_confirmed_all_years
    .groupBy("country")
    .agg(
        F.sum("total_confirmed_2020").alias("total_confirmed_2020"),
        F.sum("total_confirmed_2021").alias("total_confirmed_2021"),
        F.sum("total_confirmed_2022").alias("total_confirmed_2022")
    )
)

# Order the result by country
total_confirmed_all_years = total_confirmed_all_years.orderBy("country")

# Convert to Pandas DataFrame
df_all_years = total_confirmed_all_years.toPandas()

# Modify the 'country' column
df_all_years['country'] = df_all_years['country'].str.replace('_', ' ').str.title()

# Display the updated DataFrame
print(df_all_years)

                                                                                

                country  total_confirmed_2020  total_confirmed_2021  \
0           Afghanistan               8501751              39518380   
1               Albania               3727544              51106141   
2               Algeria              10583375              57172988   
3               Andorra                682822               5084330   
4                Angola               1343410              14797145   
..                  ...                   ...                   ...   
191             Vietnam                219337             135539309   
192  West Bank And Gaza               8753200             115974691   
193               Yemen                382306               2403947   
194              Zambia               2433883              52593029   
195            Zimbabwe               1336283              30065437   

     total_confirmed_2022  
0                67783564  
1               108004674  
2                96532073  
3                15527738  
4      

In [8]:
def get_continent(country):
    try:
        cn_a2_code = country_name_to_country_alpha2(country)
    except:
        cn_a2_code = 'Unknown' 
    try:
        cn_continent = country_alpha2_to_continent_code(cn_a2_code)
    except:
        cn_continent = 'Unknown' 
    return (cn_a2_code, cn_continent)

In [9]:
df_all_years[['country_alpha2', 'continent']] = df_all_years['country'].apply(lambda x: pd.Series(get_continent(x)))

print(df_all_years)

                country  total_confirmed_2020  total_confirmed_2021  \
0           Afghanistan               8501751              39518380   
1               Albania               3727544              51106141   
2               Algeria              10583375              57172988   
3               Andorra                682822               5084330   
4                Angola               1343410              14797145   
..                  ...                   ...                   ...   
191             Vietnam                219337             135539309   
192  West Bank And Gaza               8753200             115974691   
193               Yemen                382306               2403947   
194              Zambia               2433883              52593029   
195            Zimbabwe               1336283              30065437   

     total_confirmed_2022 country_alpha2 continent  
0                67783564             AF        AS  
1               108004674             AL 

In [10]:
# Specify a custom user_agent
custom_user_agent = "my-application1"
geolocator = Nominatim(user_agent=custom_user_agent)

def geolocate(country):
    try:
        # Geolocate the center of the country
        loc = geolocator.geocode(country)
        # And return latitude and longitude
        return (loc.latitude, loc.longitude)
    except:
        # Return missing value
        return np.nan

In [None]:
# Apply the geolocate function to the 'country' column
df_all_years[['latitude', 'longitude']] = df_all_years['country'].apply(lambda x: pd.Series(geolocate(x)))

# Display the updated DataFrame
#print(df)

# Save the updated DataFrame to a CSV file
#df.to_csv('output_file.csv', index=False)

corrections = {
    'Albania': (41.153332, 20.168331),
    'Antarctica': (-90.0, 0.0),
    'Bosnia And Herzegovina': (43.915886, 17.679076),
    'Brazil': (-14.235004, -51.92528),
    'China': (35.86166, 104.195397),
    'Germany': (51.165691, 10.451526),
    #'Guinea-Bissau': (11.803749, -15.180413),
    'Iran': (32.427908, 53.688046),
    'Korea North': (40.339852, 127.510093),
    'Kuwait': (29.375859, 47.977405),
    'Moldova': (47.411631, 28.369885),
    'Morocco': (31.791702, -7.09262),
    'Russia': (61.52401, 105.318756),
    'Seychelles': (-4.679574, 55.491977),
    #'Timor-Leste': (-8.874217, 125.727539),
    'Us': (37.09024, -95.712891),
    'Egypt': (26.820553, 30.802498),
    'Georgia': (42.3154, 43.3569),
    'Mongolia': (46.8625, 103.8467),
    'Sweden': (60.1282, 18.6435),
    'Greece': (39.0742, 21.8243),
    'Hungary': (47.1625, 19.5033),
    'Lebanon': (33.8547, 35.8623),
    'Israel': (31.0461, 34.8516),
    'Colombia': (4.5709, -74.2973),
    'Cambodia': (12.5657, 104.9910)
}


# Update the DataFrame with corrected coordinates
for country, (lat, lon) in corrections.items():
    df_all_years.loc[df_all_years['country'] == country, ['latitude', 'longitude']] = lat, lon

# Display the updated DataFrame
print(df_all_years)


In [None]:
# Empty map
world_map_all_years = folium.Map(tiles="cartodbpositron")
marker_cluster_all_years = MarkerCluster().add_to(world_map_all_years)

# For each coordinate, create a CircleMarker for valid coordinates
for i in range(len(df_all_years)):
    lat = df_all_years.iloc[i]['latitude']
    long = df_all_years.iloc[i]['longitude']
    
    # Check for NaN values
    if not np.isnan(lat) and not np.isnan(long):
        radius = 5
        color = 'red'
        popup_text = f"""Country: {df_all_years.iloc[i]['country']}<br>
                        Total confirmed 2020: <span style="color: {color};"><strong>{df_all_years.iloc[i]['total_confirmed_2020']}</span></strong><br>
                        Total confirmed 2021: <span style="color: {color};"><span style="color: {color};"><strong><strong>{df_all_years.iloc[i]['total_confirmed_2021']}</span></span></strong></strong><br>
                        Total confirmed 2022: <span style="color: {color};"><strong>{df_all_years.iloc[i]['total_confirmed_2022']}</span></strong><br>"""
        
        folium.CircleMarker(location=[lat, long], radius=radius, popup=popup_text, fill=True).add_to(marker_cluster_all_years)

# Show the map
world_map_all_years

In [None]:
# Stop the Spark session
sc.stop()