
# **Install Java, Spark, and Findspark**

This installs Apache Spark, Java 8, and Findspark, a library that makes it easy for Python to find Spark.

In [48]:
%%bash
sudo apt-get update
apt-get install openjdk-8-jdk-headless -qq > /dev/null
[ ! -e "$(basename spark-3.1.2-bin-hadoop2.7.tgz)" ] && wget  http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz  
tar xf spark-3.1.2-bin-hadoop2.7.tgz
pip install -q findspark

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:14 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:15 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bioni

# **Set Environment Variables**

Set the locations where Spark and Java are installed.

In [49]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

# **Start a SparkSession**

This will start a local Spark session. Prior to Spark 2.0.0 sparkContext was used. However in order to use APIs of SQL, HIVE, and Streaming, separate contexts need to be created.

SPARK 2.0.0 onwards, SparkSession provides a single point of entry to use DataFrame and Dataset APIs. All the functionality available with sparkContext are also available in sparkSession. It is recommended to use Spark session in stead of Spark context now.

Refer: http://spark.apache.org/docs/2.4.0/api/python/index.html

In [50]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# get a spark session. 
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Create Dataframe in Spark!

Now you can create a Dataframe in spark. Download file from https://storage.googleapis.com/mbcc/datasets/detroit_demolitions_dataset.csv using wget and then create a dataframe using 'read.csv' function on spark session object.

In this implementation we allow the spark API to infer schema. We also set header=True so that the first line is not inserted as a row and instead is used as column names. We can do this for this dataset as the first line has the column names and hence convenient to use inferSchema attribute.
# Alternative - Defining your own schema

Instead of allowing Spark api to determine the schema, you can define your own schema by setting the schema = demolition_schema

where 'demolition_schema' is the schema definition you need to construct before setting that argument.
References

https://docs.databricks.com/getting-started/spark/dataframes.html#load-sample-data

https://docs.databricks.com/data/data-sources/read-csv.html

# Reading in the Data

In [51]:
! [ ! -e "$(basename COVID_data.csv)" ] && wget  'https://storage.googleapis.com/hk1159/COVID_data.csv'
df = spark.read.csv('COVID_data.csv',
                      header= True, 
                      inferSchema = True)


# Cleaning/Scrubbing Data 



## Creating a Combined date field


In [52]:
df = df.withColumn("Full Date", F.concat(F.col("month"),F.lit('/'),
    F.col("year")))

print(df.columns)
df.printSchema()
df.head(3)

['row_id', 'code', 'country', 'region_code', 'region', 'wave', 'month', 'year', 'indicator_topic', 'indicator', 'indicator_description', 'indicator_display', 'indicator_val', 'urban_rural', 'industry', 'sample_subset', 'sample_total', 'FCS', 'income_group', 'lending_category', 'unit_measure', 'measure_type', 'GDP_pc', 'ln_GDP_pc', 'many_waves', 'weight_type', 'footnote', 'survey_producer', 'survey_link', 'last_updated', 'source', 'Full Date']
root
 |-- row_id: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region_code: string (nullable = true)
 |-- region: string (nullable = true)
 |-- wave: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- indicator_topic: string (nullable = true)
 |-- indicator: string (nullable = true)
 |-- indicator_description: string (nullable = true)
 |-- indicator_display: string (nullable = true)
 |-- indicator_val: double (nullable = true)
 |-- ur

[Row(row_id=1, code='AFG', country='Afghanistan', region_code='SAR', region='South Asia', wave='WAVE1', month=9, year=2020, indicator_topic='Demographic', indicator='Demo_educ1', indicator_description='% respondent by level of education - No education', indicator_display='Level of education - No education', indicator_val=70.0, urban_rural='National', industry='All', sample_subset=5013, sample_total=5057, FCS='High-Intensity Conflict', income_group='Low income', lending_category='IDA', unit_measure='Percent of respondent', measure_type='p', GDP_pc=2065, ln_GDP_pc=8, many_waves=0, weight_type='Individual weight', footnote=None, survey_producer=None, survey_link=None, last_updated='03/8/2022 16:44PM', source='High Frequency Phone Survey', Full Date='9/2020'),
 Row(row_id=2, code='AFG', country='Afghanistan', region_code='SAR', region='South Asia', wave='WAVE1', month=9, year=2020, indicator_topic='Demographic', indicator='Demo_educ2', indicator_description='% respondent by level of educat

## Counting NA's in each column

In [53]:
from pyspark.sql.functions import col,isnan,when,count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+------+----+-------+-----------+------+----+-----+----+---------------+---------+---------------------+-----------------+-------------+-----------+--------+-------------+------------+-----+------------+----------------+------------+------------+------+---------+----------+-----------+--------+---------------+-----------+------------+------+---------+
|row_id|code|country|region_code|region|wave|month|year|indicator_topic|indicator|indicator_description|indicator_display|indicator_val|urban_rural|industry|sample_subset|sample_total|  FCS|income_group|lending_category|unit_measure|measure_type|GDP_pc|ln_GDP_pc|many_waves|weight_type|footnote|survey_producer|survey_link|last_updated|source|Full Date|
+------+----+-------+-----------+------+----+-----+----+---------------+---------+---------------------+-----------------+-------------+-----------+--------+-------------+------------+-----+------------+----------------+------------+------------+------+---------+----------+-----------+------

## Filling the NA's with a non-sense date


In [54]:
df = df.fillna({'Full Date': '9/9999'})
df.groupBy('Full Date').count().show()

+---------+-----+
|Full Date|count|
+---------+-----+
|  10/2021|  228|
|   6/2021|14138|
|   2/2021| 1860|
|   9/9999|    4|
|   1/2021| 3215|
|   9/2021|  268|
|   9/2020| 4905|
|   5/2020|12598|
|  10/2020| 5439|
|   7/2021| 2638|
|  11/2021|  591|
|   3/2020|  294|
|  12/2021|  140|
|  11/2020| 4292|
|  12/2020| 6674|
|   3/2021| 5179|
|   4/2021| 1929|
|   6/2020|20983|
|   5/2021| 5253|
|   8/2021| 1905|
+---------+-----+
only showing top 20 rows



## Look at what these 9/9999 date's look like


In [55]:
df.where(df["Full Date"] == "9/9999").show()

+------+----+-------+-----------+--------------------+-----+-----+----+---------------+----------------+---------------------+--------------------+-------------+-----------+------------+-------------+------------+----+------------+----------------+--------------------+------------+------+---------+----------+-----------+--------+---------------+-----------+-----------------+--------------------+---------+
|row_id|code|country|region_code|              region| wave|month|year|indicator_topic|       indicator|indicator_description|   indicator_display|indicator_val|urban_rural|    industry|sample_subset|sample_total| FCS|income_group|lending_category|        unit_measure|measure_type|GDP_pc|ln_GDP_pc|many_waves|weight_type|footnote|survey_producer|survey_link|     last_updated|              source|Full Date|
+------+----+-------+-----------+--------------------+-----+-----+----+---------------+----------------+---------------------+--------------------+-------------+-----------+---------

## Rounding off the decimal place in Indicator Value

In [56]:
from pyspark.sql.functions import format_number
df = df.withColumn('indicator_val', format_number('indicator_val', 0))
df.show()

+------+----+-----------+-----------+----------+-----+-----+----+---------------+----------------+---------------------+--------------------+-------------+-----------+--------------------+-------------+------------+--------------------+------------+----------------+--------------------+------------+------+---------+----------+-----------------+--------------------+---------------+-----------+-----------------+--------------------+---------+
|row_id|code|    country|region_code|    region| wave|month|year|indicator_topic|       indicator|indicator_description|   indicator_display|indicator_val|urban_rural|            industry|sample_subset|sample_total|                 FCS|income_group|lending_category|        unit_measure|measure_type|GDP_pc|ln_GDP_pc|many_waves|      weight_type|            footnote|survey_producer|survey_link|     last_updated|              source|Full Date|
+------+----+-----------+-----------+----------+-----+-----+----+---------------+----------------+------------

## Changing the name of Indicator Values Column


In [57]:
df = df.withColumnRenamed("indicator_val", "indicator_values")
df.describe()

DataFrame[summary: string, row_id: string, code: string, country: string, region_code: string, region: string, wave: string, month: string, year: string, indicator_topic: string, indicator: string, indicator_description: string, indicator_display: string, indicator_values: string, urban_rural: string, industry: string, sample_subset: string, sample_total: string, FCS: string, income_group: string, lending_category: string, unit_measure: string, measure_type: string, GDP_pc: string, ln_GDP_pc: string, many_waves: string, weight_type: string, footnote: string, survey_producer: string, survey_link: string, last_updated: string, source: string, Full Date: string]

# Some Summary Stats 




## Summary of Data Frame

In [58]:
df.summary("count", "min", "25%", "75%", "max").show()

+-------+------+------+-----------+-----------+-------------------+------+------+------+--------------------+--------------------+---------------------+--------------------+----------------+-----------+--------------+-------------+------------+--------------------+-------------------+----------------+--------------------+------------+------+---------+----------+----------------+--------------------+--------------------+--------------------+-----------------+--------------------+---------+
|summary|row_id|  code|    country|region_code|             region|  wave| month|  year|     indicator_topic|           indicator|indicator_description|   indicator_display|indicator_values|urban_rural|      industry|sample_subset|sample_total|                 FCS|       income_group|lending_category|        unit_measure|measure_type|GDP_pc|ln_GDP_pc|many_waves|     weight_type|            footnote|     survey_producer|         survey_link|     last_updated|              source|Full Date|
+-------+---

## Looking at how many rows there are for each country

In [59]:

df.groupBy('country').count().show()


+--------------------+-----+
|             country|count|
+--------------------+-----+
|                Chad|  917|
|            Paraguay| 2239|
|             Senegal|  751|
|              Guyana|  709|
|         Philippines| 2142|
|            Djibouti|  412|
|            Malaysia|  799|
|              Malawi| 3595|
|                Iraq| 2909|
|         Afghanistan|   38|
|            Cambodia| 3162|
|            Maldives|   39|
|              Rwanda| 1140|
|               Sudan| 3323|
|           Sri Lanka|   39|
|            Dominica|  844|
|           Argentina| 2173|
|Central African R...|  152|
|             Ecuador| 3303|
|  Mozambique (urban)| 2486|
+--------------------+-----+
only showing top 20 rows



## Showing How Many Distinct Countries There Are


In [60]:
df.registerTempTable("mytable") 
total_countries = spark.sql("""
    SELECT COUNT(DISTINCT country)
	FROM mytable 
  """)
print(type(total_countries))
print(total_countries.describe())
total_countries.show()

<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[summary: string, count(DISTINCT country): string]
+-----------------------+
|count(DISTINCT country)|
+-----------------------+
|                     84|
+-----------------------+




# Filtering and Looking at columns


## Looking at a subset of Iraq and Wave 1 in a table

In [61]:
columns= ['country','unit_measure','wave','ln_GDP_pc',
          'measure_type']

df.where((df['country'] == "Iraq") & (df['wave'] == "WAVE1"))\
                                                        .select(columns).show()

+-------+--------------------+-----+---------+------------+
|country|        unit_measure| wave|ln_GDP_pc|measure_type|
+-------+--------------------+-----+---------+------------+
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|        9|           p|
|   Iraq|Percent of respon...|WAVE1|    

## Looking at how many rows had the country of Iraq and industries equal to "All"


In [62]:
df.filter((df["country"] == "Iraq") & (df["industry"] == "All")).count()

798

# Finding the Range of GDP %


In [63]:
from pyspark.sql.functions import max,min
df.select(max("GDP_pc"), min("GDP_pc")).show()

+-----------+-----------+
|max(GDP_pc)|min(GDP_pc)|
+-----------+-----------+
|      33185|        945|
+-----------+-----------+



# Tables showing the Iraq values


In [64]:
df.registerTempTable("mytable") 

Iraq_country = spark.sql("""
    SELECT * FROM mytable where country like 'Iraq'
""")
print(type(Iraq_country))
print(Iraq_country.describe())
Iraq_country.show()

<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[summary: string, row_id: string, code: string, country: string, region_code: string, region: string, wave: string, month: string, year: string, indicator_topic: string, indicator: string, indicator_description: string, indicator_display: string, indicator_values: string, urban_rural: string, industry: string, sample_subset: string, sample_total: string, FCS: string, income_group: string, lending_category: string, unit_measure: string, measure_type: string, GDP_pc: string, ln_GDP_pc: string, many_waves: string, weight_type: string, footnote: string, survey_producer: string, survey_link: string, last_updated: string, source: string, Full Date: string]
+------+----+-------+-----------+--------------------+-----+-----+----+---------------+----------+---------------------+--------------------+----------------+-----------+------------+-------------+------------+--------------------+-------------------+----------------+--------------------+-

# Creating Two way Frequency table with Year and Income Group

In [65]:
Year_Freq = df.crosstab('year', 'income_group')
Year_Freq.show()

+-----------------+-----------+----------+-------------------+-------------------+
|year_income_group|High income|Low income|Lower middle income|Upper middle income|
+-----------------+-----------+----------+-------------------+-------------------+
|             2021|       2378|     11157|               8811|              14998|
|             2020|       3369|     28569|              30028|              23545|
+-----------------+-----------+----------+-------------------+-------------------+



# Differences between Pandas and Spark


1. The first big difference for me is that Spark has very easy to use SQL capabilities, which makes it easier to understand for someone who knows SQL well but not Python. Pandas has pandassql but it's not built in as easily. 

2. Spark seems to have messier outputs but it is easier to make things like the two way table above than Pandas.
 
3. On the otherhand it seems like updating, deleting, and adding columns is easier in Pandas than spark.
