<h5> Objectives </h5>
<ol>
  <li> Reads data from S3
  <li> Creates the schema
  <li> Writes the data back to S3 as parquet files, so that it is easy to query and faster to use
</ol>

In [2]:
#uncomment for jupyter notebook
#import findspark
#findspark.init()

#import pyspark
#sc = pyspark.SparkContext()
#spark = pyspark.sql.SparkSession(sc)

In [3]:
# General imports
import matplotlib.pyplot as pyplt
import numpy as np
from ggplot import *
from datetime import datetime

import pyspark.sql.functions as sf
from pyspark.sql import Row


<h4> Helper functions </h4>

In [5]:
def getInt(x):
    try: v = int(x)
    except: v = 0
    return v


def getDate(x):
    try: v = datetime.strptime(x,"%Y%m%d").date()
    except: v = datetime.strptime("19010101","%Y%m%d").date()
    return v

def nothing(x):
    return x

def getFloat(x):
    try: v = float(x)
    except: v = 0
    return v


# Rename columns of the dataframe using the array of new names
def renameCols(df,newNames):
    nDF = df
    assert(len(df.schema.names) == len(newNames))
    for i in range(0,len(df.schema.names)):
        nDF = nDF.withColumnRenamed(nDF.schema.names[i],newNames[i])
    return nDF

# Takes a list of functions, and applies thme to the row, 
# input: array of strings
# output: tuple ( types as determinted by the return type of the functions )
def getTupleFromSchema(l,colExtractors):
   assert(len(colExtractors) == len(l))
   return tuple ( [colExtractors[i](l[i]) for i in range(0,len(l))] )


In [6]:

# These functions generate a dataframe from a file
# fname: Name of CSV file from which to get the dataframe
# colExtractors: a list of functions that will be used to convert to the schemaType
# colExtractors are assumed to handle errors
def getDF(fname,colExtractors):
   # read the file
   inRDD = sc.textFile(fname)

    # get the schema from the first row
   schema = ''.join(inRDD.take(1)).split(',')
    
   # leave out the first row (assumed column)
   allRowsExceptFirst = inRDD.zipWithIndex().filter(lambda r: r[1] > 0 ).map(lambda r: r[0])
   
   # create the RDD of interest, mapping the colums as we go. 
   tRDD = allRowsExceptFirst.map(lambda l: l.replace(' ','').split(','))\
              .map(lambda l: getTupleFromSchema(l, colExtractors))
    
    # Rename columns based on schema
   tDF = renameCols(tRDD.toDF(),schema)
   return tDF


# Exactly same as above but adds a synthetic index to be used if needed
def getDFWithIndex(fname,colExtractors):
   # read the file
   inRDD = sc.textFile(fname)

    # get the schema from the first row
   schema = [u'idx_'] + ''.join(inRDD.take(1)).split(',')
    
   # leave out the first row (assumed column)
   allRowsExceptFirst = inRDD.zipWithIndex().filter(lambda r: r[1] > 0 ).map(lambda r: str(r[1]) + ',' + r[0])
   
   # create the RDD of interest, mapping the colums as we go. 
   tRDD = allRowsExceptFirst.map(lambda l: l.replace(' ','').split(','))\
              .map(lambda l: getTupleFromSchema(l, [getInt] + colExtractors))
    
    # Rename columns based on schema
   tDF = renameCols(tRDD.toDF(),schema)
   return tDF  

<h2> Create dataframes here </h2>

<h4> Setup data sources </h4>
<h6> Steps to follow </h6>
<ul> 
  <li> Upload files to S3 bucket
  <li> Specify the configuration below and mount it
</ul>

In [9]:
# Replace with your values
#
# NOTE: Set the access to this notebook appropriately to protect the security of your keys.
# Or you can delete this cell after you run the mount command below once successfully.

# Uncomment below to load your data th first time
#ACCESS_KEY = "REPLACE_WITH_YOUR_ACCESS_KEY"
#SECRET_KEY = "REPLACE_WITH_YOUR_SECRET_KEY"
#ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
#AWS_BUCKET_NAME = "REPLACE_WITH_YOUR_S3_BUCKET"
#MOUNT_NAME = "REPLACE_WITH_YOUR_MOUNT_NAME"

#dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

In [10]:
# Data files
members_data           = '/mnt/kkboxmount/members.csv'
sample_submission_zero_data = '/mnt/kkboxmount/sample_submission_zero.csv'
train_data             = '/mnt/kkboxmount/train.csv'
transactions_data      = '/mnt/kkboxmount/transactions.csv'
user_logs_part_data    = '/mnt/kkboxmount/user_logs_part.csv'
user_logs_data         = '/mnt/kkboxmount/user_logs.csv.gz'

# Parquet files
members_par           = '/mnt/kkboxmount/parq/members'
sample_submission_zero_par = '/mnt/kkboxmount/parq/sample_submission_zero'
train_par             = '/mnt/kkboxmount/parq/train'
transactions_par      = '/mnt/kkboxmount/parq/transactions'
user_logs_part_par    = '/mnt/kkboxmount/parq/user_logs_part'
user_logs_par         = '/mnt/kkboxmount/parq/user_logs'

# Tables files
members_table           = 'unmgmt_members_table'
sample_submission_zero_table = 'unmgmt_sample_submission_zero_table'
train_table             = 'unmgmt_train_table'
transactions_table      = 'unmgmt_transactions_table'
user_logs_part_table    = 'unmgmt_user_logs_part_table'
user_logs_1_table         = 'unmgmt_user_logs_1_table'
user_logs_2_table         = 'unmgmt_user_logs_2_table'
user_logs_3_table         = 'unmgmt_user_logs_2_table'

<h5> Get a clean memDF ("DF of members") </h5>

In [12]:
sc.textFile(members_data).take(2)

In [13]:
memDF = getDF(members_data,[nothing,getInt,getInt,nothing,getInt,getDate,getDate]).repartition("msno","registration_init_time","expiration_date")
memDF.show(3)

<h5> Get a clean sampleSubZeroDF ("DF of Sample Submission") </h5>

In [15]:
sampleSubZeroDF = getDF(sample_submission_zero_data,[nothing,getInt]).repartition("msno")
sampleSubZeroDF.show(5)

<h5> Get a clean trainDF ("DF of train") </h5>

In [17]:
trainDF = getDF(train_data,[nothing,getInt]).repartition("msno")
trainDF.show(5)

<h5> Get a clean transactionsDF ("DF of transactions") </h5>

In [19]:
transactionsDF = getDF(transactions_data,[nothing,getInt,getInt,getInt,getInt,getInt,getDate,getDate,getInt]).repartition("msno")
transactionsDF.show(3)

In [20]:
userLogPartDF = getDF(user_logs_part_data,[nothing,getDate,getInt,getInt,getInt,getInt,getInt,getInt,getFloat]).repartition("msno")
userLogPartDF.show(3)

In [21]:
#User Logs is really large so we are going to split it into three dataframes
# Step1: Add an idx
# Step2: Create three dataframes what we will write (idx, msno, date), (idx, num_25,num_50,num_75), (idx, num_100, num_unq,total_secs)
# Step3: Write them to S3


In [22]:
userLogAllDF = getDF(user_logs_data,[nothing,getDate,getInt,getInt,getInt,getInt,getInt,getInt,getFloat]).repartition("msno")
userLogAllDF.show(3)

In [23]:
#userLog_1 = userLogAllDF.select(['idx_','msno','date']).repartition("msno","idx_")
#userLog_2 = userLogAllDF.select(['idx_','msno','num_25','num_50','num_75']).repartition("msno","idx_")
#userLog_3 = userLogAllDF.select(['idx_','msno','num_100','num_unq','total_secs']).repartition("msno","idx_")

<h3> Save all these datasets as parquet files and Tables so queries are faster </h3>

In [25]:
memDF.write.parquet(members_par,mode="overwrite",partitionBy="msno",compression="snappy")

In [26]:
memDF.write.saveAsTable(members_table)

In [27]:
sampleSubZeroDF.write.parquet(sample_submission_zero_par,mode="overwrite",partitionBy="msno",compression="snappy")
trainDF.write.parquet(train_par,mode="overwrite",partitionBy="msno",compression="snappy")
userLogPartDF.write.parquet(user_logs_part_par,mode="overwrite",partitionBy="msno",compression="snappy")

In [28]:
sampleSubZeroDF.write.saveAsTable(sample_submission_zero_table)
trainDF.write.saveAsTable(train_table)
userLogPartDF.write.saveAsTable(user_logs_part_table)

In [29]:
transactionsDF.write.parquet(transactions_par,mode="overwrite",partitionBy="msno",compression="snappy")

In [30]:
transactionsDF.write.saveAsTable(transactions_table)

In [31]:
userLogAllDF.write.parquet(user_logs_par,mode="overwrite",partitionBy="msno",compression="snappy")
#userLog_1.write.parquet(user_logs_par,mode="overwrite",partitionBy="msno",compression="snappy")
#userLog_2.write.parquet(user_logs_par,mode="overwrite",partitionBy="msno",compression="snappy")
#userLog_3.write.parquet(user_logs_par,mode="overwrite",partitionBy="msno",compression="snappy")

In [32]:
#userLog_1.write.saveAsTable(user_logs_1_table)
#userLog_2.write.saveAsTable(user_logs_2_table)
#userLog_3.write.saveAsTable(user_logs_2_table)