The cell below is just for creating paths

In [13]:
import os
import shutil

# General Purpose Paths
data_small_path = os.path.join('data','data_small.csv')
data_big_path = os.path.join('data','data_big.csv')
results_path = 'results'

# Ex1 - Paths
output_rdd_ex1_path = os.path.join(results_path, 'Ex1_rdd.csv')
# Ex2 - Paths
output_rdd_ex2_path = os.path.join(results_path, 'Ex2_rdd')
# Ex3 - Paths
output_rdd_ex3_path = os.path.join(results_path, 'Ex3_rdd')

# Create result directory from scratch
if not os.path.exists(results_path):
    os.makedirs(results_path)

# Exercise 1

Create an inverted index with the following structure:

- Continent - Count of occurences

In [12]:
%%time
!rm -rf {output_rdd_ex1_path}
import pyspark
from operator import add

def toCSVLine(data):
    return ','.join(str(d) for d in data)

sc = pyspark.SparkContext('local[*]')
try :
    lines = sc.textFile(data_big_path)
    continent_occurences = lines.map( lambda line : \
                                     (line.split(',')[5],int(line.split(',')[9])))
    invertedIdx = continent_occurences.reduceByKey(add)
    invertedIdx_csv = invertedIdx.map(lambda group: toCSVLine(group))
    invertedIdx_csv.saveAsTextFile(output_rdd_ex1_path)
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

CPU times: user 50 ms, sys: 10 ms, total: 60 ms
Wall time: 14.8 s


# Exercise 2

Create an inverted index with the following structure:
- disaster_type : regions

In [8]:
%%time
!rm -rf {output_rdd_ex2_path}

import pyspark
from operator import add

def toCSVLine(data):
    return ','.join(str(d) for d in data)

sc = pyspark.SparkContext('local[*]')
try :
    lines = sc.textFile(data_big_path)
    disasters_regions = lines.map( lambda line : ( line.split(',')[3], line.split(',')[6] ) )
    invIndexDisastersRegions = disasters_regions.groupByKey().mapValues(set)
    invIndexDisastersRegions_csv = invIndexDisastersRegions.map(lambda group: toCSVLine(group))
    invIndexDisastersRegions_csv.saveAsTextFile(output_rdd_ex2_path)
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

CPU times: user 70 ms, sys: 20 ms, total: 90 ms
Wall time: 27.7 s


# Exercise 3

Using an inverted index, solve: <br>

What are the probabilities of getting injured or dying in a natural disaster of type T in the continent C
during decade D

In [11]:
%%time
import pyspark
from operator import add

def nan_to_zero(a: str)->int:
    """
    Puts a zero string whenever the value is missing
    """
    if not a:
        return "0"
    return a

def calculate_proba(num:int,den:int,decimals=3):
    if num == den == 0:
        return 0
    return round(num,den,decimals)

def calculate_probas(lists_of_tokens: list):
    """
    calculates probabilities of dying and being injured
    regarding total_affected.
    """
    # positions 0 contain each deaths count
    total_deaths = sum([tokens[0] for tokens in lists_of_tokens])
    # positions 1 contain each injured count
    total_injured = sum([tokens[1] for tokens in lists_of_tokens])
    # positions 2 contain each total_affected count
    total_affected = sum([tokens[2] for tokens in lists_of_tokens])
    
    if total_injured == total_deaths == total_affected == 0:
        death_proba = 0
        injured_proba = 0
    else:
        death_proba = round(total_deaths / total_affected,3)
        injured_proba = round(total_injured / total_affected,3)
    return [injured_proba, death_proba]

def toCSVLine(data):
    return ','.join(str(d) for d in data)


# list with indices of the relevant columns for this exercise
# respectively: 0:decade, 3:disaster, 5:continent, 10:total_deaths, 
# 11:injured, 12:affected, 13:homeless, 14:total_affected.
columns = [0,3,5,10,11,12,13,14]

sc = pyspark.SparkContext('local[*]')
try :
    lines = sc.textFile(data_big_path)
    tokenized = lines.map(lambda line: line.split(','))
    # select relevant columns for this exercise
    relevant_tokens = tokenized.map(lambda tokens: [token for idx, token in enumerate(tokens) if idx in columns])
    # convert missing values to "0"
    fix_nans = relevant_tokens.map(lambda tokens: [nan_to_zero(e) for e in tokens])
    # convert numeric fields from string to int
    fix_ints = fix_nans.map(lambda tokens: tokens[:3] + [int(e) for e in tokens[3:]])
    # convert years to decades
    fix_decades = fix_ints.map(lambda tokens: [tokens[0][:-1]+'0'] + tokens[1:])
    
    # add a new field that is the sum of total_deaths, 
    # injured, affected and homeless for each row.
    custom_total_affected = fix_decades.map(lambda tokens:\
                                         tokens + [sum([token for token in tokens[3:7]])])
    
    # if the total_affected field is smaller than the sum of each 
    # type of affected, then replace it's value with the sum.
    fixed_total_affected = custom_total_affected.map(lambda tokens: \
                            tokens[:-1] if tokens[-2] >= tokens[-1] else tokens[:-2] + [tokens[-1]])
    
    # dataset has been preprocessed, we can start solving the exercise
    
    # create a single key by concatenating decade with continent with disaster
    # and vales are a list : [death_count, injured_count, total_affected_count]
    key_value = fixed_total_affected.map(lambda tokens: (f"{tokens[0]}_{tokens[1]}_{tokens[2]}", [tokens[3],tokens[4],tokens[-1]]))
    groups = key_value.groupByKey()
    probas = groups.mapValues(calculate_probas)
    sorted_keys = probas.sortByKey(ascending=True)
    
    sorted_keys_csv = sorted_keys.map(lambda group: toCSVLine(group))
    sorted_keys_csv.saveAsTextFile(output_rdd_ex3_path)
    
    sc.stop()
except Exception as e:
    print(e)
    sc.stop()

CPU times: user 80 ms, sys: 30 ms, total: 110 ms
Wall time: 34.8 s
