# Data Cleansing and Feature Engineering

## 1. Setting Up Spark Context

In [29]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [30]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

## 2. Download data from Object Store

In [31]:
import os
import getpass

def get_or_set_environment_variable(variable):
    try:
        var = os.environ[variable]
    except KeyError:
        var = getpass.getpass('Please enter value for {:}: '.format(variable))
    
    os.environ[variable] = var
    return var

ibm_api_key_id = get_or_set_environment_variable('IBM_API_KEY_ID')
ibm_cloud_store_bucket = get_or_set_environment_variable('IBM_CLOUD_STORE_BUCKET')

In [32]:
import json
import os

import types
from botocore.client import Config
import ibm_boto3

def __iter__(self): return 0

# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share the notebook.
client = ibm_boto3.client(service_name='s3',
    ibm_api_key_id=ibm_api_key_id,
    ibm_auth_endpoint="https://iam.cloud.ibm.com/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')

body = client.get_object(Bucket=ibm_cloud_store_bucket,
                         Key='etl_parquet_files.json')['Body']
# add missing __iter__ method, so pandas accepts body as file-like object 

if not hasattr(body, "__iter__"): body.__iter__ = types.MethodType( __iter__, body )

files = json.load(body)
files

{'train': ['desaster_detection_train-0000.parquet'],
 'label': ['desaster_detection_label-0000.parquet'],
 'test': ['desaster_detection_test-0000.parquet']}

In [33]:
def load_dataframe(files):
    dfs = []
    for fn in files:
        body = client.get_object(Bucket=ibm_cloud_store_bucket,
                                 Key=fn)['Body']
        if not hasattr(body, "__iter__"):
            body.__iter__ = types.MethodType( __iter__, body )
        
        tfn = 'temp_{:}'.format(fn)
        with open(tfn, 'wb') as temp:
            temp.write(body.read())
        dfs.append(spark.read.parquet(tfn))
    df = dfs.pop()
    for other in dfs:
        df = df.union(other)
    return df

df_train = load_dataframe(files['train'])
df_test = load_dataframe(files['test'])

In [34]:
df_train.schema == df_test.schema

True

In [43]:
df_train.limit(10).toPandas()

Unnamed: 0,id,text
0,1,Our Deeds are the Reason of this #earthquake M...
1,4,Forest fire near La Ronge Sask. Canada
2,5,All residents asked to 'shelter in place' are ...
3,6,"13,000 people receive #wildfires evacuation or..."
4,7,Just got sent this photo from Ruby #Alaska as ...
5,8,#RockyFire Update => California Hwy. 20 closed...
6,10,#flood #disaster Heavy rain causes flash flood...
7,13,I'm on top of the hill and I can see a fire in...
8,14,There's an emergency evacuation happening now ...
9,15,I'm afraid that the tornado is coming to our a...


## 2. Data Cleansing

In [44]:
def drop_unused_cols(df):
    return df.drop('location', 'keyword')

df_train = drop_unused_cols(df_train)
df_test = drop_unused_cols(df_test)

df_train.limit(10).toPandas()

Unnamed: 0,id,text
0,1,Our Deeds are the Reason of this #earthquake M...
1,4,Forest fire near La Ronge Sask. Canada
2,5,All residents asked to 'shelter in place' are ...
3,6,"13,000 people receive #wildfires evacuation or..."
4,7,Just got sent this photo from Ruby #Alaska as ...
5,8,#RockyFire Update => California Hwy. 20 closed...
6,10,#flood #disaster Heavy rain causes flash flood...
7,13,I'm on top of the hill and I can see a fire in...
8,14,There's an emergency evacuation happening now ...
9,15,I'm afraid that the tornado is coming to our a...


## 4. Serializing the dataframes in *Parquet* format

In [37]:
!rm -r ./desaster_detection_*

In [40]:
import glob

temp_parquet_file = os.path.join(os.path.curdir,
                                 'desaster_detection_clean{}')
df_train.write.parquet(temp_parquet_file.format('train'), mode='overwrite')
df_test.write.parquet(temp_parquet_file.format('test'), mode='overwrite')

glob.glob(temp_parquet_file.format('*'))

['./desaster_detection_cleantest', './desaster_detection_cleantrain']

## 5. Uploading the files to object cloud

In [41]:
def upload_parquet(client, path):
    parts = glob.glob(os.path.join(path, '*.parquet'))
    parquets = ['{:s}-{:04d}.parquet'.format(os.path.split(path)[-1], i)
                for i in range(len(parts))]
    for part, parquet in zip(parts, parquets):
        with open(part, 'rb') as parquetF:
            client.put_object(Bucket=ibm_cloud_store_bucket,
                          Body=parquetF,
                          Key=parquet
                         )
    return parquets

client = ibm_boto3.client(service_name='s3',
    ibm_api_key_id=ibm_api_key_id,
    ibm_auth_endpoint="https://iam.cloud.ibm.com/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')


parquets = {}
for dataset in ('train', 'test'):
    parquets[dataset] = upload_parquet(client, temp_parquet_file.format(dataset))

print(parquets)

{'train': ['desaster_detection_cleantrain-0000.parquet'], 'test': ['desaster_detection_cleantest-0000.parquet']}


In [42]:
import json


parquets['label'] = files['label']
client.put_object(Bucket=ibm_cloud_store_bucket,
                  Body=json.dumps(parquets),
                  Key='feature_eng_parquet_files.json')

{'ResponseMetadata': {'RequestId': '0a300fde-e02a-424b-9c46-798cab505d66',
  'HostId': '',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 15 Jan 2021 16:09:20 GMT',
   'x-clv-request-id': '0a300fde-e02a-424b-9c46-798cab505d66',
   'server': 'Cleversafe',
   'x-clv-s3-version': '2.5',
   'x-amz-request-id': '0a300fde-e02a-424b-9c46-798cab505d66',
   'etag': '"32a7a4a0557df12cefd21b61af931764"',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"32a7a4a0557df12cefd21b61af931764"'}