In [133]:
# Инициируем сессию
from pyspark.sql import SparkSession

spark = (SparkSession.builder
.master("local[*]")
.appName('PySpark_Tutorial')
.getOrCreate()
)

In [156]:
# Читаем JSON файл со странами
countries_json_file = '../Countries-Cities/world_population.json'
df_countries = spark.read.option('header', True).json(countries_json_file)

In [213]:
# Читаем CSV файл с городами
cities_csv_file = '../Countries-Cities/world_cities.csv'
df_cities = spark.read.option('header', True).csv(cities_csv_file)

In [199]:
# Читаем CSV файл со справочником городов
codes_csv_file = '../Countries-Cities/CountryCodes.csv'
df_codes = spark.read.option('header', True).csv(codes_csv_file)

In [211]:
temp = df_cities.select(
    f.trim(f.col("city_ascii")).alias("city"),
    f.expr("lat").cast("decimal(10,4)").alias("lat"),
    f.expr("lng").cast("decimal(10,4)").alias("lng"),
    f.trim(f.col("country")).alias("country"),
    f.trim(f.col("admin_name")).alias("admin_name"),
    f.trim(f.col("capital")).alias("capital"),
    f.trim(f.col("population")).alias("population"),
    f.trim(f.col("id")).alias("id")                            
)

In [202]:
df_cities.show(3, vertical=True)

-RECORD 0----------------
 city       | Tokyo      
 city_ascii | Tokyo      
 lat        | 35.6839    
 lng        | 139.7744   
 country    | Japan      
 iso2       | JP         
 iso3       | JPN        
 admin_name | Tōkyō      
 capital    | primary    
 population | 39105000   
 id         | 1392685764 
-RECORD 1----------------
 city       | Jakarta    
 city_ascii | Jakarta    
 lat        | -6.2146    
 lng        | 106.8451   
 country    | Indonesia  
 iso2       | ID         
 iso3       | IDN        
 admin_name | Jakarta    
 capital    | primary    
 population | 35362000   
 id         | 1360771077 
-RECORD 2----------------
 city       | Delhi      
 city_ascii | Delhi      
 lat        | 28.6667    
 lng        | 77.2167    
 country    | India      
 iso2       | IN         
 iso3       | IND        
 admin_name | Delhi      
 capital    | admin      
 population | 31870000   
 id         | 1356872604 
only showing top 3 rows



In [190]:
# Подготавливаем датафрейм со странами
from pyspark.sql import functions as f
countryDf = df_countries.select(
    f.trim(f.col("CCA3")).alias("CCA3"),
    f.trim(f.col("Capital")).alias("Capital"),
    f.trim(f.col("Continent")).alias("Continent"),    
    f.trim(f.col("Country/Territory")).alias("Country"),    
    f.expr("`Area (sq mi)` * 2.59").cast("decimal(10,4)").alias("Area (km²)"),
    f.trim(f.col("Population")).cast("decimal(10,0)").alias("Population"),        
    f.expr("`Density (per sq mi)` * 0.3861").cast("decimal(10,4)").alias("Density (per km²)")                            
)

In [197]:
# Подготавливаем датафрейм с городами по странам
citiesDf = df_cities.fillna(0).groupBy("country") \
    .agg(f.count("city_ascii").alias("cities"), f.sum("population").cast("decimal").alias("urban_population"))

In [None]:
# Готовим результирующую таблицу
resultDf = 

In [198]:
citiesDf.show(50, vertical=True)

-RECORD 0--------------------------------
 country          | Côte d'Ivoire        
 cities           | 26                   
 urban_population | 8208356              
-RECORD 1--------------------------------
 country          | Chad                 
 cities           | 30                   
 urban_population | 2155206              
-RECORD 2--------------------------------
 country          | Russia               
 cities           | 1505                 
 urban_population | 109305229            
-RECORD 3--------------------------------
 country          | Paraguay             
 cities           | 44                   
 urban_population | 2187731              
-RECORD 4--------------------------------
 country          | Anguilla             
 cities           | 1                    
 urban_population | null                 
-RECORD 5--------------------------------
 country          | Yemen                
 cities           | 42                   
 urban_population | 6639126       

In [186]:
df_countries.show(2, vertical=True)

-RECORD 0------------------------------------
 Area (sq mi)                | 251826.003    
 CCA3                        | AFG           
 Capital                     | Kabul         
 Continent                   | Asia          
 Country/Territory           | Afghanistan   
 Density (per sq mi)         | 163.321402413 
 Population                  | 41128771      
 World Population Percentage | 0.52          
-RECORD 1------------------------------------
 Area (sq mi)                | 11099.6028    
 CCA3                        | ALB           
 Capital                     | Tirana        
 Continent                   | Europe        
 Country/Territory           | Albania       
 Density (per sq mi)         | 256.072829298 
 Population                  | 2842321       
 World Population Percentage | 0.04          
only showing top 2 rows



In [146]:
df_countries.printSchema()

root
 |-- Area (sq mi): double (nullable = true)
 |-- CCA3: string (nullable = true)
 |-- Capital: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Country/Territory: string (nullable = true)
 |-- Density (per sq mi): double (nullable = true)
 |-- Population: string (nullable = true)
 |-- World Population Percentage: string (nullable = true)



In [75]:
# Приводим код страны к прописным буквам для JOIN'а с таблицей городов
from pyspark.sql.functions import upper
df_countries = df_countries.withColumn("CCA3", upper(df_countries["CCA3"]))

In [78]:
# Конвертируем квадратные мили в квадратные километры в столбцах с площадью и плотность населения
df_countries = df_countries.withColumn("Density (per km²)", 2.58999 * df_countries["Density (per sq mi)"])
df_countries = df_countries.withColumn("Area (km²)", 2.58999 * df_countries["Area (sq mi)"])

In [111]:
# Выполняем JOIN двух таблиц
merged_df = df_cities.join(df_countries, df_cities.iso3 == df_countries.CCA3, how = 'left')

In [117]:
# Оставляем только необходимые столбцы 
merged_df = merged_df.select("city_ascii","country","lat","lng","population")
# merged_df.withColumn("population", merged_df["population"].cast(int()))
merged_df = merged_df.withColumn("population", merged_df["population"].cast("integer"))

In [119]:
merged_df.show()

+------------+-------------+--------+--------+----------+
|  city_ascii|      country|     lat|     lng|population|
+------------+-------------+--------+--------+----------+
|       Tokyo|        Japan| 35.6839|139.7744|  39105000|
|     Jakarta|    Indonesia| -6.2146|106.8451|  35362000|
|       Delhi|        India| 28.6667| 77.2167|  31870000|
|      Manila|  Philippines| 14.6000|120.9833|  23971000|
|   Sao Paulo|       Brazil|-23.5504|-46.6339|  22495000|
|       Seoul|  South Korea| 37.5600|126.9900|  22394000|
|      Mumbai|        India| 19.0758| 72.8775|  22186000|
|    Shanghai|        China| 31.1667|121.4667|  22118000|
| Mexico City|       Mexico| 19.4333|-99.1333|  21505000|
|   Guangzhou|        China| 23.1288|113.2590|  21489000|
|       Cairo|        Egypt| 30.0444| 31.2358|  19787000|
|     Beijing|        China| 39.9040|116.4075|  19437000|
|    New York|United States| 40.6943|-73.9249|  18713220|
|     Kolkata|        India| 22.5727| 88.3639|  18698000|
|      Moscow|

In [130]:
agregate = merged_df.groupBy("country").sum("population") #.sort("population")

In [132]:
agregate.show()

AnalysisException: Column 'population' does not exist. Did you mean one of the following? [sum(population), country];
'Sort ['population ASC NULLS FIRST], true
+- Aggregate [country#3098], [country#3098, sum(population#4617) AS sum(population)#4801L]
   +- Project [city_ascii#3095, country#3098, lat#3096, lng#3097, cast(population#3103 as int) AS population#4617]
      +- Project [city_ascii#3095, country#3098, lat#3096, lng#3097, population#3103]
         +- Project [city_ascii#3095, country#3098, lat#3096, lng#3097, population#3103]
            +- Join LeftOuter, (iso3#3100 = CCA3#3203)
               :- Relation [city#3094,city_ascii#3095,lat#3096,lng#3097,country#3098,iso2#3099,iso3#3100,admin_name#3101,capital#3102,population#3103,id#3104] csv
               +- Project [1970 Population#3043, 1980 Population#3044, 1990 Population#3045, 2000 Population#3046, 2010 Population#3047, 2015 Population#3048, 2020 Population#3049, 2022 Population#3050, Area (sq mi)#3051, CCA3#3203, Capital#3053, Continent#3054, Country/Territory#3055, Density (per sq mi)#3056, Growth Rate#3057, Rank#3058, World Population Percentage#3059, Density (per km²)#3307, (Area (sq mi)#3051 * 2.58999) AS Area (km²)#3326]
                  +- Project [1970 Population#3043, 1980 Population#3044, 1990 Population#3045, 2000 Population#3046, 2010 Population#3047, 2015 Population#3048, 2020 Population#3049, 2022 Population#3050, Area (sq mi)#3051, CCA3#3203, Capital#3053, Continent#3054, Country/Territory#3055, Density (per sq mi)#3056, Growth Rate#3057, Rank#3058, World Population Percentage#3059, (Density (per sq mi)#3056 * 2.58999) AS Density (per km²)#3307]
                     +- Project [1970 Population#3043, 1980 Population#3044, 1990 Population#3045, 2000 Population#3046, 2010 Population#3047, 2015 Population#3048, 2020 Population#3049, 2022 Population#3050, Area (sq mi)#3051, upper(CCA3#3052) AS CCA3#3203, Capital#3053, Continent#3054, Country/Territory#3055, Density (per sq mi)#3056, Growth Rate#3057, Rank#3058, World Population Percentage#3059]
                        +- Relation [1970 Population#3043,1980 Population#3044,1990 Population#3045,2000 Population#3046,2010 Population#3047,2015 Population#3048,2020 Population#3049,2022 Population#3050,Area (sq mi)#3051,CCA3#3052,Capital#3053,Continent#3054,Country/Territory#3055,Density (per sq mi)#3056,Growth Rate#3057,Rank#3058,World Population Percentage#3059] json


23/02/05 16:04:23 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1444464 ms exceeds timeout 120000 ms
23/02/05 16:04:23 WARN SparkContext: Killing executors is not supported by current scheduler.


In [83]:
merged_df.show(10, vertical=True)

23/02/05 14:41:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
-RECORD 0-----------------------------------------
 city                        | Tokyo              
 city_ascii                  | Tokyo              
 lat                         | 35.6839            
 lng                         | 139.7744           
 country                     | Japan              
 iso2                        | JP                 
 iso3                        | JPN                
 admin_name                  | Tōkyō              
 capital                     | primary            
 population                  | 39105000           
 id                          | 1392685764         
 1970 Population             | 105416839          
 1980 Population             | 117624196          
 1990 Population             | 123686321          
 2000 Population             | 126803861          
 201

In [81]:
df_countries.show(10, vertical=True)

-RECORD 0------------------------------------------
 1970 Population             | 10752971            
 1980 Population             | 12486631            
 1990 Population             | 10694796            
 2000 Population             | 19542982            
 2010 Population             | 28189672            
 2015 Population             | 33753499            
 2020 Population             | 38972230            
 2022 Population             | 41128771            
 Area (sq mi)                | 251827.41086614216  
 CCA3                        | AFG                 
 Capital                     | kabul               
 Continent                   | asia                
 Country/Territory           | afghanistan         
 Density (per sq mi)         | 24.347100184880794  
 Growth Rate                 | 1.0257              
 Rank                        | 36                  
 World Population Percentage | 0.52                
 Density (per km²)           | 63.0587460078394    
 Area (km²) 

In [80]:
df_cities.show(10, vertical=True)

-RECORD 0----------------------
 city       | Tokyo            
 city_ascii | Tokyo            
 lat        | 35.6839          
 lng        | 139.7744         
 country    | Japan            
 iso2       | JP               
 iso3       | JPN              
 admin_name | Tōkyō            
 capital    | primary          
 population | 39105000         
 id         | 1392685764       
-RECORD 1----------------------
 city       | Jakarta          
 city_ascii | Jakarta          
 lat        | -6.2146          
 lng        | 106.8451         
 country    | Indonesia        
 iso2       | ID               
 iso3       | IDN              
 admin_name | Jakarta          
 capital    | primary          
 population | 35362000         
 id         | 1360771077       
-RECORD 2----------------------
 city       | Delhi            
 city_ascii | Delhi            
 lat        | 28.6667          
 lng        | 77.2167          
 country    | India            
 iso2       | IN               
 iso3   

In [136]:
cd ../Countries-Cities

/Users/pk/Library/CloudStorage/OneDrive-Личная/GitHub/Countries-Cities
