<a href="https://colab.research.google.com/github/yowainwright/google-colab-notebooks/blob/main/pyspark_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Pyspark demo**

This notebook uses only built-in Pyspark functions to find the country which had the greatest population change from 1960 to 2021.

The goal of this demo was/is for me to gain familarity to Pyspark as a utility to quickly grab data and manage it in an ephemiral state with minimal effort.

I realized while going through this process it was harder than I expected to do it all with just Pyspark.

In [None]:
!pip install opendatasets
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import opendatasets as od
  
od.download(
    "https://www.kaggle.com/datasets/fredericksalazar/population-world-since-1960-to-2021")

Skipping, found downloaded files in "./population-world-since-1960-to-2021" (use force=True to force download)


In [None]:
import pyspark
from pyspark.sql import SparkSession

# setups up a basic form of the table

# create a spark session
spark = SparkSession.builder.appName("data_population")\
          .getOrCreate()

# create a dataframe from the csv
df = spark.read.option("header", "true").csv('population-world-since-1960-to-2021/data_population_world.csv')
# remove columns that are irrelavant to work
df = df.drop("Country Code", "Indicator Name", "Indicator Code", "_c66")
# remove null Countries
df = df.filter(df['Country Name'].isNotNull() & df['1960'].isNotNull())

# cache the dataset from here
df.cache()



DataFrame[Country Name: string, 1960: string, 1961: string, 1962: string, 1963: string, 1964: string, 1965: string, 1966: string, 1967: string, 1968: string, 1969: string, 1970: string, 1971: string, 1972: string, 1973: string, 1974: string, 1975: string, 1976: string, 1977: string, 1978: string, 1979: string, 1980: string, 1981: string, 1982: string, 1983: string, 1984: string, 1985: string, 1986: string, 1987: string, 1988: string, 1989: string, 1990: string, 1991: string, 1992: string, 1993: string, 1994: string, 1995: string, 1996: string, 1997: string, 1998: string, 1999: string, 2000: string, 2001: string, 2002: string, 2003: string, 2004: string, 2005: string, 2006: string, 2007: string, 2008: string, 2009: string, 2010: string, 2011: string, 2012: string, 2013: string, 2014: string, 2015: string, 2016: string, 2017: string, 2018: string, 2019: string, 2020: string, 2021: string]

In [None]:
df.show()

+--------------------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|        Country Name|    1960|    1961|    1962|     1963|     1964|     1965|     1966|     1967|     1968|     1969|     1970|     1971|     1972|     1973|     1974|     1975|     1976|     1977|     1978|     1979|     1980|     1981|     1982|     1983|     1984|     1985|     1986|     1987|     1988|     1989|     1990|     1991|     1992|     1993| 

In [None]:
from pyspark.sql.functions import coalesce
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql import functions as F

def process_minimum_variants(df):
  df = df.select("Country Name", "1960", "2021")
  return df

def process_country_population_growth(df):
  df = process_minimum_variants(df)
  return df.withColumn('variance (percent %)', F.round(coalesce(col('2021'), lit(0)) / coalesce(col('1960'), lit(1)), 2))

df = process_country_population_growth(df)
df.show()




+--------------------+--------+---------+--------------------+
|        Country Name|    1960|     2021|variance (percent %)|
+--------------------+--------+---------+--------------------+
|               Aruba|   54608|   106537|                1.95|
|          Afganistán| 8622466| 40099462|                4.65|
|              Angola| 5357195| 34503774|                6.44|
|             Albania| 1608800|  2811666|                1.75|
|             Andorra|    9443|    79034|                8.37|
|      El mundo árabe|93359407|456520777|                4.89|
|Emiratos Árabes U...|  133426|  9365145|               70.19|
|           Argentina|20349744| 45808747|                2.25|
|             Armenia| 1904148|  2790974|                1.47|
|     Samoa Americana|   20085|    45035|                2.24|
|   Antigua y Barbuda|   55342|    93219|                1.68|
|           Australia|10276477| 25688079|                 2.5|
|             Austria| 7047539|  8955797|              

In [None]:
from pyspark.sql import functions as F

greatest_population_change = df.orderBy("variance (percent %)")
greatest_population_change.show()

+--------------------+---------+---------+--------------------+
|        Country Name|     1960|     2021|variance (percent %)|
+--------------------+---------+---------+--------------------+
| Saint Kitts y Nevis|    56660|    47606|                0.84|
|            Bulgaria|  7867374|  6877743|                0.87|
|             Letonia|  2120979|  1884490|                0.89|
|             Croacia|  4140181|  3899000|                0.94|
|             Hungría|  9983967|  9709891|                0.97|
|Bosnia y Herzegovina|  3262539|  3270943|                 1.0|
|            Lituania|  2778550|  2800839|                1.01|
|             Ucrania| 42767251| 43792855|                1.02|
|             Georgia|  3645600|  3708610|                1.02|
|              Serbia|  6608000|  6834326|                1.03|
|             Rumania| 18406905| 19119880|                1.04|
|     República Checa|  9602006| 10505772|                1.09|
|             Estonia|  1211537|  133093

In [None]:
greatest_population_change = df.tail(1)[0].asDict()

def grab_tail_row_object(df):
  return df.tail(1)[0].asDict()

def convert_population_response(dict):
  country = dict["Country Name"]
  change = dict["variance (percent %)"]
  return (country, change)

def construct_str(country, change):
  percent_change = str(change * 100) + "%"
  return f"{country} had the greatest polution change with {percent_change}"

greatest_population_change = grab_tail_row_object(df)
(country, change) = convert_population_response(greatest_population_change)
string = construct_str(country, change)
print(string)

Zimbabwe had the greatest polution change with 420.0%
