## Basic initialisation

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *


In [2]:
#TODO: Write your code here
sc = SparkContext()
spark = SparkSession(sc)

#spark = SparkSession.builder.enableHiveSupport().getOrCreate()
#sc = spark.sparkContext

## Investigate dataset schema
At this point, it is enough to read in a single file to ascertain the data structure. You will be required to use the information obtained from the small subset to create a data schema. This data schema will be used when reading the entire dataset using Spark.

> ℹ️ **Instructions** ℹ️
>
>Make use of Pandas to read in a single file and investigate the plausible data types to be used when creating a Spark data schema. 
>
>*You may use as many coding cells as necessary.*

In [3]:
#TODO: Write your code here
df = spark.read.csv('train.csv', header=True)

df.toPandas()

Unnamed: 0,doi,text_id,text,sdg,labels_negative,labels_positive,agreement,id
0,10.18356/5950d914-en,bf7763beb5ad7a16764d1b7fa87ab018,Indicators for targets 9.b and 9.c have data a...,9,4,5,0.1111111111111111,1
1,10.18356/5950d914-en,b6415a528064b85fdde4b4c61239ed3a,Manufacturing value added as a percentage of G...,9,0,3,1.0,2
2,10.18356/31959a6d-en,29127def7e81b999b87c8e887a4fe882,"""To Share or Not to Share: That is the Questio...",Politics,and Financial Stability. R. Holzmann,E. Palmer and D. Robalino (Eds.).Washington,DC: World Bank. The Political Economy of Pens...,"CA: Academic Press."""
3,10.1787/eco/surveys-cze-2014-6-en,459db322b9e44630320fda76783e0f69,"As of 2004, parents can work without losing th...",4,2,2,0.0,4
4,10.1787/9789264119536-11-en,8b7d8c6c605fe9695d08ab03d601e0e9,A question of considerable policy relevance is...,10,1,4,0.6,5
...,...,...,...,...,...,...,...,...
25939,10.18356/2d07fa8a-en,eb5757e9da9a0b61a2d8a2bd352f34ce,"But, in general, such programmes neither targe...",2,0,3,1.0,25940
25940,10.18356/daadf51f-en,2bdeee9dff95a4a110cbb75648582558,"Concentration of refugees in remote camps, as ...",4,1,8,0.7777777777777778,25941
25941,10.1787/9789264235151-4-en,0754b035be2628e9a06afea711a04fec,Designating 3.8 million ha for rice production...,2,0,9,1.0,25942
25942,10.1787/9789264209138-3-en,7dae9220a2463cc1d2b2513c032317a8,Figure 1.1 proposes a simplified framework tha...,6,1,2,0.3333333333333333,25943


In [4]:
df.printSchema()

root
 |-- doi: string (nullable = true)
 |-- text_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- sdg: string (nullable = true)
 |-- labels_negative: string (nullable = true)
 |-- labels_positive: string (nullable = true)
 |-- agreement: string (nullable = true)
 |-- id: string (nullable = true)



In [5]:
df.columns

['doi',
 'text_id',
 'text',
 'sdg',
 'labels_negative',
 'labels_positive',
 'agreement',
 'id']

In [6]:
df.describe().toPandas()

Unnamed: 0,summary,doi,text_id,text,sdg,labels_negative,labels_positive,agreement,id
0,count,25944,25944,25944,25944,25941,25938,25936,25935
1,mean,,,,7.4128027405991075,2.305501720523672,5.373623781063228,1.3076224698434045,12689.553507052267
2,stddev,,,,42.068342181534675,38.744960860960255,25.127344126730385,33.16511227215876,7638.091499003142
3,min,10.14217/128123a5-en,00021941702cd84171ff33962197ca1f,"""""""This again places mitigation and adaptation...","""""Expand Your Business” (IYB)","""""Diagnostico del cluster forestal""""","""""Announcing a new dawn prematurely? London","""""Optimal Spectrum Sensing Framework for Cogn...",(c) a review by an independent panel set up b...
4,max,11.1002/pub/81234575-036c9708-en,ffff357a454e2d7544f765cf2660e684,’Z2 Research in the United States puts the ret...,’ La Paz,she may accept a lump-sum payment in lieu of h...,Vo\. 2,9,9999


## Update column names
To make the data easier to work with, you will need to make a few changes:
1. Column headers should all be in lowercase; and
2. Whitespaces should be replaced with underscores.


> ℹ️ **Instructions** ℹ️
>
>Make sure that the column headers are all in lowercase and that any whitespaces are replaced with underscores.
>
>*You may use as many coding cells as necessary.*

In [7]:
#TODO: Write your code here
for column in df.columns:
    df = df.withColumnRenamed(column, '_'.join(column.split()).lower())
df.columns

['doi',
 'text_id',
 'text',
 'sdg',
 'labels_negative',
 'labels_positive',
 'agreement',
 'id']

## Null Values
Null values often represent missing pieces of data. It is always good to know where your null values lie - so you can quickly identify and remedy any issues stemming from these.

> ℹ️ **Instructions** ℹ️
>
>Write code to count the number of null values found in each column.
>
>*You may use as many coding cells as necessary.*

In [8]:
#TODO: Write your code here
#fd_count = df_1.select([count(when(col(c).isNull(),c)).alias(c) for c in df_1.columns])

df_null = df.select([F.count(F.when(F.col(c).contains('None') |\
		F.col(c).contains('NULL') | \
		(F.col(c)== '') | \
		F.col(c).isNull(),c)).alias(c) for c in df.columns])



In [9]:
df_null.show()

+---+-------+----+---+---------------+---------------+---------+---+
|doi|text_id|text|sdg|labels_negative|labels_positive|agreement| id|
+---+-------+----+---+---------------+---------------+---------+---+
|  0|      0| 113|  2|              3|              6|        8| 10|
+---+-------+----+---+---------------+---------------+---------+---+



## Data type conversion - The final data schema

Now that we have identified the number of missing values in the data set, we'll move on to convert our data schema to the required data types. 

> ℹ️ **Instructions** ℹ️
>
>Use typecasting to convert the string data types in your current data schema to more appropriate data types.
>
>*You may use as many coding cells as necessary.*

In [10]:
df.show(2)

+--------------------+--------------------+--------------------+---+---------------+---------------+------------------+---+
|                 doi|             text_id|                text|sdg|labels_negative|labels_positive|         agreement| id|
+--------------------+--------------------+--------------------+---+---------------+---------------+------------------+---+
|10.18356/5950d914-en|bf7763beb5ad7a167...|Indicators for ta...|  9|              4|              5|0.1111111111111111|  1|
|10.18356/5950d914-en|b6415a528064b85fd...|Manufacturing val...|  9|              0|              3|               1.0|  2|
+--------------------+--------------------+--------------------+---+---------------+---------------+------------------+---+
only showing top 2 rows



In [11]:
df.printSchema()

root
 |-- doi: string (nullable = true)
 |-- text_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- sdg: string (nullable = true)
 |-- labels_negative: string (nullable = true)
 |-- labels_positive: string (nullable = true)
 |-- agreement: string (nullable = true)
 |-- id: string (nullable = true)



In [12]:
#TODO: Write your code here
schema = StructType([StructField('doi', StringType(), True),
                     StructField('text_id', StringType(), True),
                     StructField('text', StringType(), True),
                     StructField('sdg', IntegerType(), True),
                     StructField('labels_negative', IntegerType(), True),
                     StructField('labels_positive', IntegerType(), True),
                     StructField('agreement', FloatType(), True),
                     StructField('id', IntegerType(), True)])

#df = spark.createDataFrame(df_1, schema=schema)

df_1 = spark.read.csv('train.csv', header=True, schema=schema, multiLine=True)

In [13]:
df_1.printSchema()

root
 |-- doi: string (nullable = true)
 |-- text_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- sdg: integer (nullable = true)
 |-- labels_negative: integer (nullable = true)
 |-- labels_positive: integer (nullable = true)
 |-- agreement: float (nullable = true)
 |-- id: integer (nullable = true)



In [14]:
df_1.show(2000)

+--------------------+--------------------+--------------------+----+---------------+---------------+----------+----+
|                 doi|             text_id|                text| sdg|labels_negative|labels_positive| agreement|  id|
+--------------------+--------------------+--------------------+----+---------------+---------------+----------+----+
|10.18356/5950d914-en|bf7763beb5ad7a167...|Indicators for ta...|   9|              4|              5|0.11111111|   1|
|10.18356/5950d914-en|b6415a528064b85fd...|Manufacturing val...|   9|              0|              3|       1.0|   2|
|10.18356/31959a6d-en|29127def7e81b999b...|"To Share or Not ...|null|           null|           null|      null|null|
|10.1787/eco/surve...|459db322b9e446303...|As of 2004, paren...|   4|              2|              2|       0.0|   4|
|10.1787/978926411...|8b7d8c6c605fe9695...|A question of con...|  10|              1|              4|       0.6|   5|
|10.1787/978926425...|36751c50d4f7799b7...|In Model A, p

## Consolidate missing values
We have to check if the data type conversion above was done correctly.
If the casting was not successful, a null value gets inserted into the dataframe. You can thus check for successful conversion by determining if any null values are included in the resulting dataframe.


> ℹ️ **Instructions** ℹ️
>
>Write code to compare the number of invalid entries (nulls) pre-conversion and post-conversion.
>
>*You may use as many coding cells as necessary.*

In [15]:
#TODO: Write your code here

df_1_null = df_1.select([F.count(F.when(F.col(c).contains('None') |\
		F.col(c).contains('NULL') | \
		(F.col(c)== '') | \
		F.col(c).isNull(),c)).alias(c) for c in df_1.columns])

df_null.show()
df_1_null.show()

+---+-------+----+---+---------------+---------------+---------+---+
|doi|text_id|text|sdg|labels_negative|labels_positive|agreement| id|
+---+-------+----+---+---------------+---------------+---------+---+
|  0|      0| 113|  2|              3|              6|        8| 10|
+---+-------+----+---+---------------+---------------+---------+---+

+---+-------+----+---+---------------+---------------+---------+---+
|doi|text_id|text|sdg|labels_negative|labels_positive|agreement| id|
+---+-------+----+---+---------------+---------------+---------+---+
|  0|      0| 113|852|            671|            515|      383|481|
+---+-------+----+---+---------------+---------------+---------+---+



Here you should be able to see if any of your casts went wrong. 
Do not attempt to correct any missing values at this point. This will be dealt with in later sections of the predict.

## Generate parquet files
When writing in Spark, we typically use parquet format. This format allows parallel writing using Spark's optimisation while maintaining other useful things like metadata.

When writing, it is good to make sure that the data is sufficiently partitioned. 

Generally, data should be partitioned with one partition for every 200MB of data, but this also depends on the size of your cluster and executors. 


### Check the size of the dataframe before partitioning

In [16]:
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

In [17]:
rdd = df_1.rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
obj = rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
size = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(obj)
size_MB = size/1000000
#partitions = max(int(size_MB/200), 2)
calcPart = int(size_MB/200)
partitions = calcPart+2 if calcPart == 0 else calcPart
print(f'The dataframe is {size_MB} MB')
print(f'partitions is {partitions}')

The dataframe is 5.188144 MB
partitions is 2


In [18]:
rdd.getNumPartitions()

1

### Write parquet files to the local directory
> ℹ️ **Instructions** ℹ️
>
> Use the **coalesce** function and the number of **partitions** derived above to write parquet files to your local directory 
>
>*You may use as many coding cells as necessary.*

In [19]:
df_1.coalesce(partitions).write.format("parquet").mode("append").save("train_ingested_data")