@rmoff / Rittman Mead
October 20, 2016

# Initialisation

In [1]:
access_key='XXXXXXXXX'
secret='XXXXXXXXX'
bucket_name='foobar-bucket'

In [2]:
import os
# make sure pyspark tells workers to use python2 
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk-pom:1.10.34,com.databricks:spark-csv_2.11:1.3.0 pyspark-shell'

In [3]:
import pyspark
sc = pyspark.SparkContext('local[*]')
sqlContext = pyspark.SQLContext(sc)

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit

To install this package on the all-spark-notebook docker container, run from within the Docker guest (you can do this via Jupyter Terminal): 

    /opt/conda/envs/python2/bin/pip install boto
    
On other platforms the path to `pip` will vary, but the install command is the same

In [6]:
import boto

In [7]:
from urlparse import urlsplit

## Import the acme CSV (from local disk)

Using the Spark SQL [load](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader.load) method, and spark-csv pacakge. 

In [None]:
acme_file = '/home/jovyan/work/csv/acme_GB_20160923_112850.csv'
acme_file = '/home/jovyan/work/csv/acme.csv'

acme_df = sqlContext.read.load(acme_file,
                                  format='com.databricks.spark.csv',
                                  header='true',
                                  inferSchema='true')

acme_df.printSchema()

## Import the acme CSV (from S3)

### Connect to the bucket

In [8]:
conn_s3 = boto.connect_s3()
bucket = conn_s3.get_bucket(bucket_name)

### List bucket contents

This bit would drive iterative processing; for now it just picks the first file on the list

In [9]:
contents=bucket.list(prefix='processed/acme_BR_20160803_100000')
for f in contents:
    print f.name
    print f.size
    acme_file = f.name
print "\n\n--\nFile to process: %s" % acme_file

processed/acme_BR_20160803_100000.csv
21229079


--
File to process: processed/acme_BR_20160803_100000.csv


### [Testing] Copy file from one bucket to another

In [None]:
src=conn_s3.get_bucket('bucket-a')
dst=conn_s3.get_bucket('bucket-b')
contents=src.list(prefix='source/')
for f in contents:
    source_name=f.name
    target_name = os.path.split(source_name)[1]
    print source_name, target_name
    dst.copy_key(new_key_name=target_name,
                 src_bucket_name=src.name,
                 src_key_name=source_name)

### [Testing] Create a dummy text file

In [None]:
k = boto.s3.key.Key(bucket)
k.key = 'foobar'
k.set_contents_from_string('This is a test of S3')

### [Testing] Delete a file

In [None]:
k = boto.s3.key.Key(bucket)
k.key = 'foobar'
k.delete()

### [Testing] Copy a local file to the bucket

In [None]:
k = boto.s3.key.Key(bucket)
k.key = 'acme-sample.csv'
k.set_contents_from_filename(acme_file)

### [Testing] Get the CSV file from the bucket into a string

In [None]:
k = boto.s3.key.Key(bucket)
k.key = acme_file
acme_csv = k.get_contents_as_string()

### Read the CSV from S3 into Spark dataframe

In [None]:
full_uri = "s3n://{}/{}".format(bucket_name, acme_file)
print full_uri

In [None]:
print 'Loading acme data from: %s' % full_uri
acme_df = sqlContext.read.load(full_uri,
                                  format='com.databricks.spark.csv',
                                  header='true',
                                  inferSchema='true')

acme_df.printSchema()

### Get country from filename

In [13]:
filename=os.path.split(acme_file)[1]
import re
m=re.search('acme_([^_]+)_.+$', filename)
if m is None:
    country='NA'
else:
    country=m.group(1)
    
print "Country determined from filename '%s' as : %s" % (filename,country)

Country determined from filename 'acme_BR_20160803_100000.csv' as : BR


### Add country as a column to the data

In [16]:
acme_df=acme_df.withColumn('country',lit(country))
acme_df.printSchema()

root
 |-- product: string (nullable = true)
 |-- product_desc: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- date_launched: timestamp (nullable = true)
 |-- position: string (nullable = true)
 |-- product_attrib_01: string (nullable = true)
 |-- url: string (nullable = true)
 |-- product_attrib_02: string (nullable = true)
 |-- status: string (nullable = true)
 |-- reject_reason: string (nullable = true)
 |-- country: string (nullable = false)



# Deduplicate acme data

Based on the url field

In [18]:
acme_deduped_df = acme_df.dropDuplicates(['url'])

In [19]:
orig_count = acme_df.count()
deduped_count = acme_deduped_df.count()
print "Original count: %d\nDeduplicated count: %d\n\n" % (orig_count,deduped_count)
print "Number of removed duplicate records: %d" % (orig_count - deduped_count)

Original count: 97974
Deduplicated count: 96706


Number of removed duplicate records: 1268


### Create a dataframe of duplicates for analysis if required 

Using [subtract](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract) doesn't work, because the duplicates still have an entry in the de-duplicated set (because they're duplicates, duh). Instead we'll use good ole' fashioned SQL with a `GROUP BY ... HAVING COUNT(*) > 1` 

Deduplicated URLs:

    acme_deduped_df.select('url').collect()

Original list of URLs:

    acme_df.select('url').collect()

In [None]:
acme_df.registerTempTable("acme")

duplicates_df = sqlContext.sql("SELECT product,product_desc,product_type,supplier,date_launched,position,product_attrib_01,url,product_attrib_02,status,reject_reason,country from acme group by product,product_desc,product_type,supplier,date_launched,position,product_attrib_01,url,product_attrib_02,status,reject_reason,country having count(*)>1")

Write this to file for reference

In [None]:
duplicated_acme_filename='acme_duplicates/duplicates.%s' % filename
full_uri = "s3n://{}/{}".format(bucket_name, duplicated_acme_filename)

In [None]:
print 'Writing records which are duplicated to %s' % full_uri
duplicates_df.coalesce(1).write.save(full_uri,
                                     format='com.databricks.spark.csv',
                                     header='false',
                                     partitionBy='',
                                     mode='overwrite')

## Strip the url field to give the domain alone, which is the join key to `sites`

Test out the `urlparse` library for our purpose by taking the url from the first row of the dataframe

In [20]:
sample_url = 'https://www.rittmanmead.com/blog/2016/08/using-apache-drill-with-obiee-12c/'
print sample_url

print urlsplit(sample_url).netloc

https://www.rittmanmead.com/blog/2016/08/using-apache-drill-with-obiee-12c/
www.rittmanmead.com


## Add the `netloc` column to the dataframe

This works using a [UDF](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf) to apply the `netloc` function to the column values

In [None]:
def getDomain(value):
    return urlsplit(value).netloc

udfgetDomain = udf(getDomain)

This uses `withColumn` (the column name can be specified). 
Alternative is `select`, but the column name can't be specified and would need aliasing

    acme_deduped_df_with_netloc = acme_deduped_df.select('*',udfgetDomain(acme_deduped_df.url))
    
`withColumn` therefore seems the tidier solution.

In [None]:
acme_deduped_df_with_netloc = acme_deduped_df.withColumn("netloc", udfgetDomain(acme_deduped_df.url))

Inspect the modified dataset (first row)

In [None]:
acme_deduped_df_with_netloc.select('date_launched','url','netloc').show(5,truncate=False)

### Data inspection

In [None]:
acme_deduped_df_with_netloc.registerTempTable("acme")
sqlContext.sql("SELECT netloc,count(*) from acme group by netloc order by 2 desc").show(10)
sqlContext.sql("SELECT product,netloc,count(*) as cnt from acme group by product,netloc order by cnt desc").show(10)

# Join to sites reference data

## Import the sites CSV

Using the Spark SQL [load](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader.load) method, and spark-csv package. 

_**This should be possible to do directly against Oracle using SparkSQL's JDBC connectivity** - if it was appropriate (i.e. the load wasn't excessive, and freshness of data was important). Alternatively, keep dumping Oracle to flat file and import it into the Spark job each time._

In [None]:
sites_file = "s3n://{}/{}".format('foobar-bucket', 'sites.csv')
sites_df = sqlContext.read.load(sites_file,
                                  format='com.databricks.spark.csv',
                                  header='true',
                                  inferSchema='true')

Preview the schema

In [None]:
sites_df.printSchema()

### Data Quality / Inspection

Preview the first ten rows of data

In [None]:
sites_df.count()
sites_df.select('ID','SITE','SITE_RETAIL_TYPE').show(10,truncate=False)

*If `SITE_RETAIL_TYPE` is blank, is that in effect NULL?*

List out the first ten rows in the reference file that aren't blank `SITE`

In [None]:
sites_df.filter("SITE != ''").sort(sites_df.SITE).select('ID','SITE','SITE_RETAIL_TYPE').show(10,truncate=False)

Count & list blank `SITE` entries (in case `SITE` is blank but there's other relevant data present that we need to preserve)

In [None]:
blanks = sites_df.sort(sites_df.SITE).filter("SITE = ''")
blanks.show(10)
blanks.count()

Count & list blank `SITE_RETAIL_TYPE` entries (in case `SITE_RETAIL_TYPE` is blank but there's other relevant data present that we need to preserve)

In [None]:
blanks = sites_df.sort(sites_df.SITE).filter("SITE_RETAIL_TYPE = ''")
blanks.show(10)
blanks.count()

### Data cleanse

#### Remove blank SITE entries, and blank SITE_RETAIL_TYPE entries

*Maybe we need more subtlety to this, e.g. only drop blank SITE_RETAIL_TYPE if the `SITE_CATEGORY` is `Uncategorized` (so as not to drop useful data)?*

In [None]:
sites_before=sites_df.count()
print 'Sites before pruning: %d' % sites_before

In [None]:
sites_pruned_df = sites_df.filter("NOT (SITE ='' OR SITE_RETAIL_TYPE = '')")
sites_after = sites_pruned_df.count()

In [None]:
print 'Sites after pruning: %d' % sites_after
print 'Sites pruned: %d' % (sites_before-sites_after)

In [None]:
sites_pruned_df.select('ID','SITE','SITE_CATEGORY','SITE_RETAIL_TYPE').show(20,truncate=False)

## Perform the join 

List out the first ten sites in the acme file

In [None]:
acme_deduped_df_with_netloc.sort(acme_deduped_df_with_netloc.netloc).dropDuplicates(['netloc']).select('netloc').show(10)

In [None]:
sites_pruned_df.sort(sites_pruned_df.SITE).dropDuplicates(['SITE']).select('SITE').show(10,truncate=False)

In [None]:
merged_df = acme_deduped_df_with_netloc.join(sites_df,acme_deduped_df_with_netloc.netloc == sites_df.SITE, 'left_outer')

Inspect the first ten rows

In [None]:
merged_df.select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)

First 10 matched: 

In [None]:
merged_df.filter(merged_df.ID.isNotNull()).select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)

First 10 unmatched: 

In [None]:
merged_df.filter(merged_df.ID.isNull()).select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)

How many records weren't joined to the sites reference? 

In [None]:
unmatched_df = merged_df.filter(merged_df.ID.isNull())
print "Unmatched domains: %d" % unmatched_df.count()

Save the unmatched sites (`netloc`) to file. First do a `coalesce` (to a single partition) so that there's a single file, not hundreds of files in a folder (since the df is partitioned)

In [None]:
unmatched_sites_filename='acme_unmatched_sites/unmatched_sites.%s' % filename
full_uri = "s3n://{}/{}".format(bucket_name, duplicated_acme_filename)

In [None]:
print 'Writing list of sites that are unmatched to %s' % full_uri
unmatched_df.select('netloc').coalesce(1).write.save(full_uri,
                                                     format='com.databricks.spark.csv',
                                                     header='false',
                                                     partitionBy='',
                                                     mode='overwrite')

## Write the joined file to CSV

Save the joined data to file. First do a `coalesce` (to a single partition) so that there's a single file, not hundreds of files in a folder (since the df is partitioned)

In [None]:
acme_enriched_filename='acme_enriched/enriched.%s' % filename
full_uri = "s3n://{}/{}".format(bucket_name, acme_enriched_filename)

print 'Writing enriched acme data to %s' % full_uri

merged_df.write.save(full_uri,
                     format='com.databricks.spark.csv',
                     header='false',
                     mode='overwrite')
