In [1]:
import findspark
import os
findspark.init()
import pyspark
from pyspark import SparkFiles
from pyspark import sql
from pyspark import SparkConf

from pyspark.sql import SQLContext, HiveContext
from pyspark import SparkContext

In [2]:
from pyspark.sql import Row
from pyspark.sql import DataFrameWriter
from pyspark.sql import DataFrameReader
from pyspark.sql import GroupedData

from pyspark import StorageLevel

from pyspark.sql import functions as F
from pyspark.sql.functions import struct
from pyspark.sql import GroupedData

from pyspark.ml.feature import VectorAssembler, VectorIndexer

In [3]:
from functools import partial
import collections
import numpy as np

from datasu.auc import *

from datasu.dicts import *
from datasu.files import *
from datasu.pandas import *
from datasu.persist import *
from datasu.spark import *

import pandas as pd

In [4]:
conf = SparkConf()
conf.set('spark.driver.memory', '12g')
conf.set('spark.python.worker.memory', '2g')
conf.set("spark.driver.maxResultSize", "5g")
conf.set("spark.executor.max", 3)
conf.set('spark.executor.memory', '5g')
conf.set("spark.cores.max", 28)
conf.set('spark.worker.cleanup.enabled', True)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set('spark.executor.extraJavaOptions', '-XX:+PrintGCDetails -XX:+UseCompressedOops')

conf.setAppName('prepare features')
conf.getAll()

[(u'spark.master', u'spark://spark1.ea.lab:7077'),
 (u'spark.executor.max', u'3'),
 (u'spark.driver.memory', u'12g'),
 (u'spark.submit.pyFiles',
  u'/home/ds/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,/home/ds/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,/home/ds/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar'),
 (u'spark.jars',
  u'file:/home/ds/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,file:/home/ds/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,file:/home/ds/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar'),
 (u'spark.executor.memory', u'5g'),
 (u'spark.app.name', u'prepare features'),
 (u'spark.driver.maxResultSize', u'5g'),
 (u'spark.files',
  u'file:/home/ds/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,file:/home/ds/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,file:/home/ds/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar'),
 (u'spark.serializer', u'org.apache.spark.serializer.KryoSerializer'),
 (u'spark.cores.max', u'28'),
 (u's

In [29]:
try:
    sc.stop()
except:
    print 'spark context not exists'

In [6]:
   
sc = pyspark.SparkContext(conf=conf)
sqc = pyspark.SQLContext(sc)
# shq = HiveContext(sc)

In [7]:
sc.defaultParallelism, sc.defaultMinPartitions

(2, 2)

In [8]:
csv_reader = sqc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true')

## LOAD DATA

In [9]:
base_data_path = '/home/ds/dev/data/Kagle-ValuesShoppers/'
spark_data_path = 'file://'+ base_data_path + 'spark_data/'
transactions_name = 'transactions'

In [10]:
df_coupons = pd.read_csv(base_data_path+'offers')[['offer','category','company','brand','offervalue','quantity']]
df_offers_ids = pd.read_csv(base_data_path+'trainHistory').rename(columns={'id': 'customer_id'})
df_offers_ids_subm = pd.read_csv(base_data_path+'testHistory').rename(columns={'id': 'customer_id'})
# df_trans_all = pd.read_csv(base_data_path+'transactions_reduced_category').rename(columns={'id': 'customer_id'})

In [11]:
df_offers_hist = pd.merge(df_offers_ids, df_coupons, on=['offer'])
df_offers_hist = df_offers_hist[['customer_id','chain','offer','market','category','company','brand','offerdate','offervalue','quantity','repeattrips','repeater']]

df_offers_subm = pd.merge(df_offers_ids_subm, df_coupons, on=['offer'])
df_offers_subm = df_offers_subm[['customer_id','chain','offer','market','category','company','brand','offerdate','offervalue','quantity']]

In [12]:
ddf_transactions = csv_reader.load(base_data_path+transactions_name, samplingRatio=0.02)
ddf_transactions.rdd.setName(transactions_name)
ddf_transactions.alias('transactions')

ddf_transactions.rdd.getNumPartitions()
ddf_transactions = ddf_transactions.withColumnRenamed('id','customer_id')

In [13]:
ddf_transactions_small, ddf_transactions_big = ddf_transactions.randomSplit([0.05,0.95])

In [14]:
ddf_trans = ddf_transactions_small
ddf_trans.cache()

DataFrame[customer_id: bigint, chain: int, dept: int, category: int, company: bigint, brand: int, date: string, productsize: double, productmeasure: string, purchasequantity: int, purchaseamount: double]

In [15]:
ddf_trans.count()

17484788

In [16]:
cat_cols = ['chain','market','category','company','brand']
num_cols = ['offervalue','quantity']

## EXPLORE DATA

In [None]:
ddf_transactions.show(5)

In [None]:
ddf_transactions.select('dept').distinct().count()

In [None]:
ddf_transactions.select('category').distinct().count()

In [None]:
ddf_transactions.select('company').distinct().count()

In [None]:
ddf_transactions.select('brand').distinct().count()

In [None]:
ddf_transactions.select('customer_id').distinct().count()

## PREPARE DATA

In [None]:
ddf_offers_hist = sqc.createDataFrame(df_offers_hist)
ddf_offers_subm = sqc.createDataFrame(df_offers_subm)

In [None]:
ddf_offers_hist_ids = ddf_offers_hist.select('customer_id')
ddf_offers_all_ids = ddf_offers_hist_ids.unionAll(ddf_offers_subm.select('customer_id')).distinct()

ddf_transactions = ddf_transactions.join(ddf_offers_all_ids, on='customer_id', how='leftsemi')

In [None]:
ddf_transactions = ddf_transactions.repartition(12000)
ddf_transactions.rdd.getNumPartitions()


## PREPARE FEATURES

In [None]:
summ_grouping = {'total':F.sum, 'average':F.avg }
count_grouping = {'count':F.count }

# count_agg = partial(get_ddf_aggs, agg_columns=['customer_id'], agg_funcs=count_grouping, prefix='agg_')
# total_avg_agg = partial(get_ddf_aggs, agg_columns=['productsize','purchasequantity','purchaseamount'], agg_funcs=summ_grouping, prefix='agg_')

In [None]:
# grpby_columns = ['customer_id','brand', 'category', 'dept']
grpby_columns = ['customer_id','category']
grpby_columns_name = ['customer_id','brand']


## effective pivot

In [17]:
import pyspark.sql.types as sql_types
# import pyspark.sql.

from pyspark.mllib.linalg import *
from pyspark.mllib.linalg.distributed import *
from pyspark.mllib.linalg import VectorUDT, MatrixUDT, Vectors, Matrices

In [18]:
def get_ddf_aggs(grpby_columns, agg_columns, agg_funcs, prefix=None, suffix=None, cast_to=None, return_columns_names=False):
    """
    generates aggregations for spark dataframe
    :param grpby_columns: columns to groupby with: ['id','brand']
    :param agg_columns: columns to aggregate: ['productsize','purchasequantity']
    :param agg_funcs: aggregations dict to enable on agg_columns: { 'total':F.sum, 'average':F.avg }
    :param cast_to: cast aggregation result column to type (e.g. cast_to='double')
    :return [Column<avg(productsize) AS id_brand_productsize_average#59>,
             Column<sum(productsize) AS id_brand_productsize_total#60>,
             Column<avg(purchasequantity) AS id_brand_purchasequantity_average#61>,
             Column<sum(purchasequantity) AS id_brand_purchasequantity_total#62>]:

    Example:

    total_avg_agg = partial(get_ddf_aggs, agg_columns=['productsize','purchasequantity',],
                                     agg_funcs={'total':np.sum, 'average':np.average })

    grpby_columns = ['customer_id','brand']

    df_trans_grp_customer_brand = dff_trans.groupby(grpby_columns)
                                          .agg(**total_avg_agg)
    """
    aggs = []
    col_names = []
    col_prefix = prefix + '_'.join(grpby_columns)
    for col in agg_columns:
        for agg_name, agg_func in agg_funcs.iteritems():
            agg_f = agg_func(col)
            if cast_to:
                agg_f = agg_f.cast(cast_to)
            alias = "_".join([s for s in [col_prefix, col, agg_name, suffix] if s])    
            agg = agg_f.alias(alias)
            aggs.append(agg)
            col_names.append(alias)
            
    if return_columns_names:
        return aggs, col_names
    else:
        return aggs

In [19]:
total_avg_agg = partial(get_ddf_aggs, agg_columns=['productsize'], agg_funcs={'total':F.sum}, prefix='agg_', cast_to='double', return_columns_names=True)

In [20]:
cols = ['customer_id', 'category']
agg, agg_column = total_avg_agg(cols)

In [None]:
agg, agg_column

In [None]:
ddf_trans.columns

In [None]:
aggregated = ddf_trans.groupBy(*map(lambda c: F.col(c),cols)).agg(*agg)

In [None]:
aggregated.columns

In [None]:
aggregated.show(5)

In [None]:
from pyspark.ml.feature import StringIndexer

indexers = map(lambda c: StringIndexer(inputCol=c, outputCol='%s_idx' % c).fit(aggregated), ['category']) 

In [None]:
aggregated = reduce(lambda ddf,t: t.transform(ddf), indexers, aggregated)

In [None]:
aggregated.columns

In [None]:
aggregated.printSchema()

In [None]:
def columns_to_tuple(*values):
    def to_tuple(*values):
        return str(tuple(values))
    
    return UserDefinedFunction(to_tuple, StringType(), 'columns_to_tuple')

In [None]:
aggregated2 = aggregated.withColumn('tuple', columns_to_tuple()(aggregated.category_idx, aggregated.agg_customer_id_category_productsize_average))

In [None]:
aggregated2.select('tuple').first()

In [None]:
aggregated2_gr_cust = aggregated2.groupBy('customer_id_idx')

In [None]:
aggregated2_gr_cust_agg = aggregated2_gr_cust.agg(F.collect_list(F.col('tuple')))

In [None]:
aggregated2_gr_cust_agg.printSchema()

In [None]:
def join_str(*values):
    
    return UserDefinedFunction(lambda arr: ','.join(arr), StringType(), 'join_str')

In [None]:
aggregated2_gr_cust_agg = aggregated2_gr_cust_agg.withColumn('tuples',join_str()(aggregated2_gr_cust_agg['collect_list(tuple)']))

In [None]:
aggregated2_gr_cust_agg.first()

In [None]:
Vectors.parse()

In [None]:
# idea: use Vectors.parse(' ( 100,  [0],  [2])')
# 1. select max category_idx
# 2. collect_list for category_idx, agg_customer_id_category_productsize_total
# 3. join_str for each of results, so get two arrays as string in two separete columns
# 4. use F.concat_ws(',',F.lit(max index), string of index array, string of values array)

In [None]:
pivot_cols = map(lambda c: F.col('%s_idx'%c).cast('long'),cols) + [F.col('agg_customer_id_category_productsize_total').cast('double')]

In [None]:
pivot_cols

In [None]:
indexed = aggregated.select(pivot_cols)

In [None]:
indexed.select('category_idx').rdd.max()[0]

In [None]:
indexed2 = indexed.withColumn('vector', F.concat_ws(':',indexed_gr_cust_agg['customer_id_idx']))

In [None]:
indexed.show(5)

In [None]:
indexed_gr_cust = indexed.groupBy('customer_id_idx')

In [None]:
indexed_gr_cust_agg.columns

In [None]:
Vectors.parse()

In [None]:
def to_sparse(indices, values):
    return Vectors.sparse(4, [1, 3], [3.0, 4.0])


collect_to_sparse_vector = UserDefinedFunction(to_sparse, sql_types.UserDefinedType, 'collect_to_sparse_vector')

In [None]:
vals = F.col('agg_customer_id_category_productsize_total')
inds = F.col('category_idx')

# indexed_gr_cust_agg = indexed_gr_cust.agg(F.UserDefinedFunction(c1),F.collect_list(c2))
indexed_gr_cust_agg = indexed_gr_cust.agg(collect_to_sparse_vector(inds, vals))

In [None]:
r = indexed_gr_cust_agg.first()

In [None]:
Vectors.sparse()

In [None]:
_convert_to_vector()

In [None]:
r = indexed.first()

In [None]:
r

In [None]:
indexed

In [None]:
filter_columns('.*productsize.*' ,aggregated)

In [None]:
def get():
    return 1,2

a = get()
a

In [None]:
aggregated.select(a)

In [None]:
aggregated.columns

In [None]:
list(set(aggregated.columns)-set(['category', 'category_idx', 'customer_id']))[0]

In [None]:
from pyspark.mllib.linalg.distributed import CoordinateMatrix, IndexedRowMatrix
cm = CoordinateMatrix(
    aggregated.map(lambda r: (r['customer_id'], r.category_idx, r.agg_customer_id_category_productsize_total))
)

In [None]:
irm = cm.toIndexedRowMatrix()

In [None]:
irm.rows.first()

In [None]:
ddf_irm = irm.rows.toDF()

In [None]:
ddf_irm.withColumnRenamed('index', 'customer_id')

In [None]:
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2)])
sqc.createDataFrame(rdd, ["id", "score"])

In [None]:
rm.rows.count()

In [None]:
ddf_trans.show(2)

In [21]:
from pyspark.ml.feature import StringIndexer
# def aggregate(ddf, grpby_columns, aggs):
#     aggregated = ddf.groupBy(*map(lambda c: F.col(c),grpby_columns)).agg(*aggs)
#     return aggregated
    
def index_columns(ddf, index_columns, index_col_suffix='_idx'):     
    indexers = map(lambda c: StringIndexer(inputCol=c, outputCol='%s%s' % (c,index_col_suffix)).fit(ddf), index_columns) 
    indexed = reduce(lambda ddf,t: t.transform(ddf), indexers, ddf)    
    return indexed        

In [28]:
def aggregate_pivot_to_sparse_vector(ddf, id_column, pivot_column, aggs, vector_column_name='features'):
    from pyspark.mllib.linalg.distributed import CoordinateMatrix, IndexedRowMatrix

    index_col_suffix = '_idx'
    grpby_columns = [id_column, pivot_column]

    aggregated = ddf.groupBy(grpby_columns).agg(*aggs)
    
    pivot_indexed_column = pivot_column+index_col_suffix
    agg_column_names = list(set(aggregated.columns)-set([id_column, pivot_column, pivot_indexed_column]))

    indexed = index_columns(ddf=aggregated, index_columns=[pivot_column])

    res = None
    agg_columns_vectors = map(lambda c: c+'_vector',agg_column_names)
    for agg_column, agg_column_vector in zip(agg_column_names, agg_columns_vectors):
        print agg_column, agg_column_vector
        
        cm = CoordinateMatrix(
            indexed.map(lambda r: (long(r[id_column]), long(r[pivot_indexed_column]), r[agg_column]))
        )
        irm = cm.toIndexedRowMatrix()
        ddf_irm = irm.rows.toDF()
        ddf_irm = ddf_irm.withColumnRenamed('index', id_column).withColumnRenamed('vector', agg_column_vector)

        if res:
            res = res.join(ddf_irm, on=id_column, how='inner')
        else:
            res = ddf_irm

    
    if len(agg_columns_vectors)>1:
        assembler = VectorAssembler(inputCols=agg_columns_vectors, outputCol=vector_column_name)
        res = assembler.transform(res)
    else:
        res = res.withColumnRenamed(agg_columns_vectors[0], vector_column_name)
    
    res = drop_columns(res, columns=agg_columns_vectors)
    return res

In [23]:
def merge_features(ddfs, join_column, merge_column, output_column='features', drop_merged_columns=True):       
    ddf_res = ddfs.pop(0)
    merge_column_renamed = merge_column + str(0)
    merge_columns = [merge_column_renamed]
    ddf_res = ddf_res.withColumnRenamed(merge_column, merge_column_renamed)
    
    for i,ddf in enumerate(ddfs):     
        merge_column_renamed = merge_column + str(i+1)
        merge_columns.append(merge_column_renamed)
        ddf_r = ddf.withColumnRenamed(merge_column, merge_column_renamed)
        ddf_res = ddf_res.join(ddf_r, on=join_column, how='inner')
    
    assembler = VectorAssembler( inputCols=merge_columns, outputCol=output_column)
    res = assembler.transform(ddf_res)
    
    if drop_merged_columns:
        res = drop_columns(res, columns=merge_columns)
        
    return res

In [24]:
total_avg_agg = partial(get_ddf_aggs, agg_columns=['productsize','purchasequantity'], agg_funcs={'total':F.sum}, prefix='agg_', cast_to='double')

In [25]:
cols = ['customer_id', 'category']
aggs = total_avg_agg(cols)
aggs

[Column<cast((sum(productsize),mode=Complete,isDistinct=false) as double) AS agg_customer_id_category_productsize_total#127>,
 Column<cast((sum(purchasequantity),mode=Complete,isDistinct=false) as double) AS agg_customer_id_category_purchasequantity_total#128>]

In [26]:
ddf_pivot1 = aggregate_pivot_to_sparse_vector(ddf_trans, id_column='customer_id', 
                                              pivot_column='category', 
                                              aggs=total_avg_agg(['customer_id', 'category']))
                                              

agg_customer_id_category_purchasequantity_total agg_customer_id_category_purchasequantity_total_vector
agg_customer_id_category_productsize_total agg_customer_id_category_productsize_total_vector


In [27]:
ddf_pivot1.first()

Row(customer_id=98468631, features=SparseVector(1666, {0: 1.0, 1: 1.0, 5: 1.0, 8: 2.0, 13: 1.0, 19: 1.0, 22: 1.0, 24: 2.0, 27: 1.0, 28: 1.0, 46: 1.0, 50: 4.0, 54: 1.0, 55: 3.0, 60: 11.0, 75: 1.0, 85: 1.0, 98: 1.0, 133: 2.0, 144: 3.0, 156: 1.0, 163: 1.0, 165: 2.0, 166: 1.0, 208: 1.0, 226: 1.0, 284: 1.0, 287: 2.0, 400: 2.0, 401: 2.0, 478: 1.0, 485: 1.0, 512: 1.0, 833: 22.0, 834: 64.0, 838: 12.0, 841: 30.5, 846: 10.0, 852: 128.0, 855: 1.0, 857: 22.0, 860: 240.0, 861: 2.0, 879: 24.0, 883: 24.0, 887: 73.28, 888: 600.0, 893: 32.0, 908: 16.0, 918: 32.0, 931: 20.0, 966: 12.0, 977: 13.3, 989: 3.5, 996: 6.5, 998: 18.0, 999: 144.0, 1041: 16.9, 1059: 4.25, 1117: 10.0, 1120: 44.0, 1233: 1.0, 1234: 20.0, 1311: 70.0, 1318: 9.0, 1345: 4.0}))

In [None]:
total_avg_agg(['customer_id', 'brand'])

In [None]:
ddf_pivot2 = aggregate_pivot_to_sparse_vector(ddf_trans, id_column='customer_id', 
                                              pivot_column='brand', 
                                              aggs=total_avg_agg(['customer_id', 'brand']))                                              

In [None]:
ddf_pivot2.first()

In [None]:
ddf_pivot12 = ddf_pivot1.join(ddf_pivot2, on='customer_id')

In [None]:
ddf_pivot12.first()

In [None]:
assembler = VectorAssembler( inputCols=["category_features", "brand_features"], outputCol="features")

In [None]:
output = assembler.transform(ddf_pivot12)

In [None]:
output.first()

In [None]:
res = merge_features(ddfs=[ddf_pivot1,ddf_pivot2], join_column='customer_id', merge_column='features')

In [None]:
res1.first()

### pivot agg customer_category

In [None]:
ddf_agg_customer_category = pivot_aggregate(ddf_transactions, grpby_columns=['customer_id','category'],
                                            aggs=count_agg(grpby_columns),
                                            pivot_column='category', pivot_filter_support=None)

In [None]:
ddf_agg_customer_category.columns

In [None]:
ddf_agg_customer_category.rdd.setName('ddf_agg_customer_category') \
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)

## merge with offers history

In [None]:
ddf_agg_customer_category = rename_columns(ddf_agg_customer_category, prefix = 'left', separator='.', columns=cat_cols)

In [None]:
assemble_columns = list(set(ddf_agg_customer_category.columns) -set(filter_columns('left.*',ddf_agg_customer_category)) \
                    -set(ddf_offers_hist.columns) )+ cat_cols+num_cols

In [None]:
vecAssembler = VectorAssembler(inputCols=assemble_columns, outputCol="features")

#### train

In [None]:
ddf_offers__ohagg_cid_category = ddf_offers_hist.join(ddf_agg_customer_category,
                                                     on=['customer_id'], how='left_outer')

In [None]:
ddf_offers__ohagg_cid_category = vecAssembler.transform(ddf_offers__ohagg_cid_category) \
                                             .select(['customer_id','features', 'repeater']+cat_cols)

In [None]:
write_ddf_to_csv(ddf_offers__ohagg_cid_category, spark_data_path+'ddf_offers__ohagg_cid_category')

In [None]:
ddf_offers__ohagg_cid_category.show(3)

#### submission

In [None]:
ddf_offers_subm__ohagg_cid_category = ddf_offers_subm.join(ddf_agg_customer_category,
                                                      on=['customer_id'], how='left_outer')

In [None]:
ddf_offers_subm__ohagg_cid_category = vecAssembler.transform(ddf_offers_subm__ohagg_cid_category) \
                                    .select(['customer_id', 'features', F.lit('f').alias('repeater')]+cat_cols)

In [None]:
write_ddf_to_csv(ddf_offers_subm__ohagg_cid_category, spark_data_path+'ddf_offers_subm__ohagg_cid_category')


#### persist

In [None]:
def pivot_aggregate(ddf, grpby_columns, pivot_column, aggs, pivot_filter_values=None, pivot_filter_support=None):
    if pivot_filter_support and not pivot_filter_values:        
        frequent = ddf.freqItems([pivot_column], support=pivot_filter_support).first().asDict()[pivot_column+'_freqItems']
        pivot_filter_values = map(str,frequent)
    
    ddf_gr = ddf.groupBy(*grpby_columns)
    ddf_pivot = ddf_gr.pivot(pivot_column, pivot_filter_values)
    ddf_agg = ddf_pivot.agg(*aggs)
    return ddf_agg
    

In [None]:
def rename_columns(df, prefix='', suffix='', separator='_', columns=None):
    prefix = prefix + separator if prefix else prefix
    suffix = separator + suffix if suffix else suffix
    columns = df.columns if columns is None else columns
    df1 = df.select('*')
    for c in columns:
        df1 = df1.withColumnRenamed(c, prefix + c + suffix)
    return df1       


def filter_columns(expr, df):
    import re
    return filter(lambda c: re.match(expr,c), df.columns)