<a href="https://colab.research.google.com/github/rodolphojung/Data-Science-and-Analytics/blob/main/SparkIntegrationUnemploymentDB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Spark Installation

In [1]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

Download and Installation of Java version 8

In [2]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Installing Pythons API for spark, pyspark

In [3]:
!pip install -q pyspark==3.0.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 KB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Configuration

In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Connecting Pyspark and Java

In [5]:
!pip install -q findspark==1.4.2

In [6]:
import findspark

findspark.init()

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

Loading the Data

In [8]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

In [10]:
df = spark.read.csv(path = 'uk-macroeconomic-data.csv', sep = ',', header = True)

In [11]:
df.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

More information about the columns

In [12]:
df.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

Number of Lines in the df

In [16]:
df.count()

841

Number of columns in the df

In [17]:
len(df.columns)

77

Now, we start cleaning the data

In [19]:
df = df.select(['Description', 'Population (GB+NI)', 'Unemployment rate'])

In [23]:
df = df.\
  withColumnRenamed("Description", "year").\
  withColumnRenamed('Population (GB+NI)', 'population').\
  withColumnRenamed('Unemployment rate', 'unemployment_rate')

In [24]:
df.show()

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
| 1218|      null|             null|
| 1219|      null|             null|
| 1220|      null|             null|
| 1221|      null|             null|
| 1222|      null|             null|
| 1223|      null|             null|
| 1224|      null|             null|
| 1225|      null|             null|
| 1226|      null|             null|
| 1227|      null|             null|
+-----+----------+-----------------+
only showing top 20 rows



We see that the first line is a description of the units, and can get in the way of our analysis, we need to treat that

To copy the row in evidence, we use the filter method

In [25]:
df_description = df.filter(df['year'] == 'Units')

In [27]:
df_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [28]:
from pyspark.sql.functions import broadcast

How we remove the first line from the df:

* We perform a left join, excluding what the 2 dfs have in common, which is the line in df_description 

In [29]:
df = df.join(other = broadcast(df_description), on = ['year'], how = 'left_anti')

In [30]:
df.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



Removing all rows with missing values

In [31]:
df = df.dropna()

In [32]:
df.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



Creating new columns with the withcolumn method

In [33]:
df = df.withColumn('century', 1 + (df['year']/100).cast('int'))

We perform some aggregations to check if the century calculation makes sense

In [34]:
df.select(['century','year']).groupBy('century').agg({'year':'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



We see that:
* The 20th century has 100 years, the information is correct
* The 19th century has 45 years, the information is also correct, because after the cleaning of missing values, the year count starts from 1855
* the 21st Century has 17 years, thats as long as the df goes.

Now, writing the data

In [36]:
df.repartition('century').write.csv(path = 'uk-macroeconomics-data-clean', sep = ',', header = True, mode = "overwrite")