In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from os.path import abspath

spark = SparkSession.builder.getOrCreate()
print(SparkConf().getAll())

In [4]:
# only to see some settings of spark
print(SparkConf().getAll())

[('spark.master', 'local[*]'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'pyspark-shell')]


In [5]:
print(spark.sparkContext.setLogLevel("INFO"))

None


In [13]:
# path of the files
country_path = "/data/country/*.csv"
population_path = "/data/population/*.csv"

In [7]:
# open the country table
df_country = spark.read \
                          .format('csv') \
                          .option("inferSchema", "True") \
                          .option('header', 'True') \
                          .csv(country_path)

In [14]:
# open the population world table
df_population = spark.read \
                          .format('csv') \
                          .option("inferSchema", "True") \
                          .option('header', 'True') \
                          .csv(population_path)

In [11]:
# show the country table
df_country.show()

+----+-------------+-----------------+-------------+
|CCA3|      Country|          Capital|    Continent|
+----+-------------+-----------------+-------------+
| JAM|      Jamaica|         Kingston|North America|
| JPN|        Japan|            Tokyo|         Asia|
| JEY|       Jersey|     Saint Helier|       Europe|
| JOR|       Jordan|            Amman|         Asia|
| KAZ|   Kazakhstan|        Nursultan|         Asia|
| KEN|        Kenya|          Nairobi|       Africa|
| KIR|     Kiribati|           Tarawa|      Oceania|
| KWT|       Kuwait|      Kuwait City|         Asia|
| KGZ|   Kyrgyzstan|          Bishkek|         Asia|
| LAO|         Laos|        Vientiane|         Asia|
| LVA|       Latvia|             Riga|       Europe|
| LBN|      Lebanon|           Beirut|         Asia|
| LSO|      Lesotho|           Maseru|       Africa|
| LBR|      Liberia|         Monrovia|       Africa|
| LBY|        Libya|          Tripoli|       Africa|
| LIE|Liechtenstein|            Vaduz|       E

In [15]:
# show the population table
df_population.show()

+----+----+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+-----------------+-----------+---------------------------+
|Rank|CCA3|2022_Population|2020_Population|2015_Population|2010_Population|2000_Population|1990_Population|1980_Population|1970_Population|Area_(km²)|Density_(per km²)|Growth_Rate|World_Population_Percentage|
+----+----+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+-----------------+-----------+---------------------------+
|  36| AFG|       41128771|       38972230|       33753499|       28189672|       19542982|       10694796|       12486631|       10752971|    652230|          63.0587|     1.0257|                       0.52|
| 138| ALB|        2842321|        2866849|        2882481|        2913399|        3182021|        3295066|        2941651|        2324731|     28748|          98.8

In [16]:
# defining the DataFrames to spark.sql
df_country.createOrReplaceTempView("country")
df_population.createOrReplaceTempView("population")

In [17]:
# spark.sql allows run any sql queries
spark.sql("select * from country").show()

+----+-------------+-----------------+-------------+
|CCA3|      Country|          Capital|    Continent|
+----+-------------+-----------------+-------------+
| JAM|      Jamaica|         Kingston|North America|
| JPN|        Japan|            Tokyo|         Asia|
| JEY|       Jersey|     Saint Helier|       Europe|
| JOR|       Jordan|            Amman|         Asia|
| KAZ|   Kazakhstan|        Nursultan|         Asia|
| KEN|        Kenya|          Nairobi|       Africa|
| KIR|     Kiribati|           Tarawa|      Oceania|
| KWT|       Kuwait|      Kuwait City|         Asia|
| KGZ|   Kyrgyzstan|          Bishkek|         Asia|
| LAO|         Laos|        Vientiane|         Asia|
| LVA|       Latvia|             Riga|       Europe|
| LBN|      Lebanon|           Beirut|         Asia|
| LSO|      Lesotho|           Maseru|       Africa|
| LBR|      Liberia|         Monrovia|       Africa|
| LBY|        Libya|          Tripoli|       Africa|
| LIE|Liechtenstein|            Vaduz|       E

In [19]:
# the join with country and population tables using the CCA3 column
spark.sql('select c.Country, c.Capital, c.Continent, p.* from country c, population p where c.CCA3 = p.CCA3').show()

+-------------------+----------------+-------------+----+----+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+-----------------+-----------+---------------------------+
|            Country|         Capital|    Continent|Rank|CCA3|2022_Population|2020_Population|2015_Population|2010_Population|2000_Population|1990_Population|1980_Population|1970_Population|Area_(km²)|Density_(per km²)|Growth_Rate|World_Population_Percentage|
+-------------------+----------------+-------------+----+----+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+-----------------+-----------+---------------------------+
|        Afghanistan|           Kabul|         Asia|  36| AFG|       41128771|       38972230|       33753499|       28189672|       19542982|       10694796|       12486631|       10752971|    652230|          63.0587| 

In [21]:
# transform the sql querie result for a DataFrame
df_join = spark.sql('select c.Country, c.Capital, c.Continent, p.* from country c, population p where c.CCA3 = p.CCA3')

In [22]:
# count line
df_join.count()

234

In [25]:
df_join = df_join.withColumnRenamed('Area_(km²)', 'area')

In [27]:
df_join = df_join.withColumnRenamed('Density_(per km²)', 'density_per_km')

In [28]:
# save in csv 
df_join.write.format('parquet').mode('overwrite').save('/processed')