In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import Row
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import col, udf, unix_timestamp

In [2]:
sc.getConf().getAll()

[(u'spark.driver.host', u'192.168.42.75'),
 (u'spark.driver.port', u'55414'),
 (u'spark.sql.catalogImplementation', u'hive'),
 (u'spark.driver.memory', u'4g'),
 (u'spark.rdd.compress', u'True'),
 (u'spark.executor.memory', u'4g'),
 (u'spark.serializer.objectStreamReset', u'100'),
 (u'spark.master', u'local[*]'),
 (u'spark.executor.id', u'driver'),
 (u'spark.submit.deployMode', u'client'),
 (u'hive.metastore.warehouse.dir',
  u'file:/Users/rogerwu/Github/spark_rads/notebooks/spark-warehouse'),
 (u'spark.app.name', u'PySparkShell'),
 (u'spark.app.id', u'local-1484552566127')]

In [3]:
file_name = '../data/transactions.csv'
chunksize = 10 ** 5
chunk_iter = pd.read_csv(file_name, chunksize=chunksize)

In [4]:
chunk1 = chunk_iter.next()
# Convert to string
chunk1['date'] = chunk1['date'].astype(str)
chunk1['productmeasure'] = chunk1['productmeasure'].astype(str)
chunk1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 11 columns):
id                  100000 non-null int64
chain               100000 non-null int64
dept                100000 non-null int64
category            100000 non-null int64
company             100000 non-null int64
brand               100000 non-null int64
date                100000 non-null object
productsize         100000 non-null float64
productmeasure      100000 non-null object
purchasequantity    100000 non-null int64
purchaseamount      100000 non-null float64
dtypes: float64(2), int64(7), object(2)
memory usage: 8.4+ MB


In [5]:
schema1 = StructType([
    StructField("id", LongType(), False),
    StructField("chain", LongType(), True),
    StructField("dept", LongType(), True),
    StructField("category", LongType(), True),
    StructField("company", LongType(), True),
    StructField("brand", LongType(), True),
    StructField("date", StringType(), True),
    StructField("productsize", DoubleType(), True),
    StructField("purchasemeasure", StringType(), True),
    StructField("purchasequantity", LongType(), True),
    StructField("purchaseamount", DoubleType(), True)
    ])

In [6]:
transactions = sqlContext.createDataFrame(chunk1, schema1)

In [7]:
# Convert date column to a date type
func = udf(lambda x: datetime.strptime(x, '%Y-%M-%d'), DateType())
transactions = transactions.withColumn('date', func(col('date')))

In [8]:
transactions.printSchema()

root
 |-- id: long (nullable = false)
 |-- chain: long (nullable = true)
 |-- dept: long (nullable = true)
 |-- category: long (nullable = true)
 |-- company: long (nullable = true)
 |-- brand: long (nullable = true)
 |-- date: date (nullable = true)
 |-- productsize: double (nullable = true)
 |-- purchasemeasure: string (nullable = true)
 |-- purchasequantity: long (nullable = true)
 |-- purchaseamount: double (nullable = true)



In [9]:
transactions.show()

+-----+-----+----+--------+----------+-----+----------+-----------+---------------+----------------+--------------+
|   id|chain|dept|category|   company|brand|      date|productsize|purchasemeasure|purchasequantity|purchaseamount|
+-----+-----+----+--------+----------+-----+----------+-----------+---------------+----------------+--------------+
|86246|  205|   7|     707|1078778070|12564|2012-01-02|       12.0|             OZ|               1|          7.59|
|86246|  205|  63|    6319| 107654575|17876|2012-01-02|       64.0|             OZ|               1|          1.59|
|86246|  205|  97|    9753|1022027929|    0|2012-01-02|        1.0|             CT|               1|          5.99|
|86246|  205|  25|    2509| 107996777|31373|2012-01-02|       16.0|             OZ|               1|          1.99|
|86246|  205|  55|    5555| 107684070|32094|2012-01-02|       16.0|             OZ|               2|         10.38|
|86246|  205|  97|    9753|1021015020|    0|2012-01-02|        1.0|     

Get number of refund transactions

In [10]:
import pyspark.sql.functions as F

returns = transactions.select('id',
                              F.when(transactions['purchaseamount'] < 0, 1).otherwise(0).alias('return'))\
                      .withColumn('1', F.lit(1))
returns = returns.groupBy('id').agg(F.sum('1').alias('total_trans'), F.sum("return").alias('total_returns'))
returns.show()

+--------+-----------+-------------+
|      id|total_trans|total_returns|
+--------+-----------+-------------+
|18470775|        350|            8|
|14723452|        755|            5|
|15738658|         39|            0|
|17552659|        591|            4|
|12996040|        326|            5|
|16078766|        966|           54|
|18249735|       1557|           60|
|14989775|        614|           39|
|15073302|        526|           38|
|16075389|        591|           32|
|16606739|        678|            6|
|15705695|        431|            8|
|14576147|        817|           66|
|15134033|        944|           13|
|16551772|       1699|           47|
|17652157|       1407|           15|
|13089312|       1218|           52|
|13744500|       2232|          112|
|16829614|        738|           10|
|17524817|        328|            7|
+--------+-----------+-------------+
only showing top 20 rows



# Training data

In [11]:
train = pd.read_csv('../data/trainHistory.csv')
offers = pd.read_csv('../data/offers.csv')

In [12]:
# Add department to offers
dept = []
for i in range(offers.shape[0]):
    str_category = str(offers['category'].iloc[i])
    if len(str_category) == 4:
        dept.append(str_category[:2])
    else:
        dept.append(str_category[:1])
offers['dept'] = dept
offers['dept'] = pd.to_numeric(offers['dept'])

In [13]:
train = pd.merge(train, offers, how='left', on=['offer'])

In [14]:
# Rename columns
train.columns = ['id', 'chain', 'offer', 'market', 'repeattrips', 'repeater', 'offerdate', 
                 'offer_category', 'quantity', 'offer_company', 'offervalue', 'offer_brand',
                 'offer_dept']

In [15]:
# Reencode the target variable
train['repeater'] = np.where(train['repeater'] == 't', '1', '0')
train['repeater'] = pd.to_numeric(train['repeater'])

In [16]:
train['offerdate'] = train['offerdate'].astype(str)

In [17]:
train.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 160057 entries, 0 to 160056
Data columns (total 13 columns):
id                160057 non-null int64
chain             160057 non-null int64
offer             160057 non-null int64
market            160057 non-null int64
repeattrips       160057 non-null int64
repeater          160057 non-null int64
offerdate         160057 non-null object
offer_category    160057 non-null int64
quantity          160057 non-null int64
offer_company     160057 non-null int64
offervalue        160057 non-null float64
offer_brand       160057 non-null int64
offer_dept        160057 non-null int64
dtypes: float64(1), int64(11), object(1)
memory usage: 17.1+ MB


In [18]:
train_schema = StructType([
    StructField("id", LongType(), False),
    StructField("offer_chain", LongType(), True),
    StructField("offer", LongType(), True),
    StructField("market", LongType(), True),
    StructField("repeattrips", LongType(), True),
    StructField("repeater", IntegerType(), True),
    StructField("offerdate", StringType(), True),
    StructField("offer_category", LongType(), True),
    StructField("quantity", LongType(), True),
    StructField("offer_company", LongType(), True),
    StructField("offervalue", DoubleType(), True),
    StructField("offer_brand", LongType(), True),
    StructField("offer_dept", LongType(), True)
    ])

In [19]:
train_sql = sqlContext.createDataFrame(train, train_schema)

In [20]:
# Convert offerdate to date type
train_sql = train_sql.withColumn('offerdate', func(col('offerdate')))

In [21]:
train_sql.show()

+--------+-----------+-------+------+-----------+--------+----------+--------------+--------+-------------+----------+-----------+----------+
|      id|offer_chain|  offer|market|repeattrips|repeater| offerdate|offer_category|quantity|offer_company|offervalue|offer_brand|offer_dept|
+--------+-----------+-------+------+-----------+--------+----------+--------------+--------+-------------+----------+-----------+----------+
|   86246|        205|1208251|    34|          5|       1|2013-01-24|          2202|       1|    104460040|       2.0|       3718|        22|
|   86252|        205|1197502|    34|         16|       1|2013-01-27|          3203|       1|    106414464|      0.75|      13474|        32|
|12682470|         18|1197502|    11|          0|       0|2013-01-28|          3203|       1|    106414464|      0.75|      13474|        32|
|12996040|         15|1197502|     9|          0|       0|2013-01-25|          3203|       1|    106414464|      0.75|      13474|        32|
|13089

In [22]:
from pyspark.ml.feature import OneHotEncoder

# One-hot-encoding of offer category
encoder = OneHotEncoder(inputCol="offer", outputCol="categoryVec")
train_sql = encoder.transform(train_sql)

In [23]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["categoryVec"], outputCol="features")
train_sql = train_sql.withColumnRenamed('repeater', 'label')

In [24]:
output = assembler.transform(train_sql)
output.cache()

DataFrame[id: bigint, offer_chain: bigint, offer: bigint, market: bigint, repeattrips: bigint, label: int, offerdate: date, offer_category: bigint, quantity: bigint, offer_company: bigint, offervalue: double, offer_brand: bigint, offer_dept: bigint, categoryVec: vector, features: vector]

In [24]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10)

In [25]:
# Fit the model
lrModel = lr.fit(output)

In [26]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: (1208503,[1194044,1197502,1198271,1198272,1198273,1198274,1198275,1199256,1199258,1200578,1200579,1200581,1200582,1200584,1200988,1203052,1204576,1204821,1204822,1208251,1208252,1208329,1208501],[0.681203943648,-0.764870280075,-0.700849362993,-0.677445036185,-0.799256859902,-0.976931518905,-0.766949007346,-2.01345156334,-1.49523185485,-1.81003891261,-1.51114707284,-1.14736654578,-1.01360352028,-0.592535074481,-0.845522026805,0.283534790813,-0.31888005111,-0.979862005505,-0.903139387374,-0.110685361272,-0.214922612119,0.380995144121,0.134089904891])
Intercept: -0.630171091208


In [39]:
train_pred = lrModel.transform(output.select('features'))

In [46]:
train_pred.show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(1208503,[1208251...|[0.74085645247987...|[0.67718311041802...|       0.0|
|(1208503,[1197502...|[1.39504137128290...|[0.80139584987245...|       0.0|
|(1208503,[1197502...|[1.39504137128290...|[0.80139584987245...|       0.0|
|(1208503,[1197502...|[1.39504137128290...|[0.80139584987245...|       0.0|
|(1208503,[1204821...|[1.61003309671347...|[0.83341598141719...|       0.0|
|(1208503,[1197502...|[1.39504137128290...|[0.80139584987245...|       0.0|
|(1208503,[1200581...|[1.77753763698643...|[0.85539254758090...|       0.0|
|(1208503,[1200581...|[1.77753763698643...|[0.85539254758090...|       0.0|
|(1208503,[1204576...|[0.94905114231754...|[0.72092431504796...|       0.0|
|(1208503,[1197502...|[1.39504137128290...|[0.80139584987245...|       0.0|
|(1208503,[1

In [50]:
train_pred.agg(F.max('probability'), F.mean('probability')).show()

+--------------------+
|    max(probability)|
+--------------------+
|[0.93361683664692...|
+--------------------+

