# Yoochoose: Aggregate Sessions

Here we download the data, upload it to HDFS and create session objects using Spark.

### Upload data to HDFS

In [1]:
!wget http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z

--2015-07-07 15:18:40--  http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z
Resolving s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)... 54.231.136.80
Connecting to s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)|54.231.136.80|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 287211932 (274M) [application/octet-stream]
Saving to: `yoochoose-data.7z'


2015-07-07 15:18:54 (19.9 MB/s) - `yoochoose-data.7z' saved [287211932/287211932]



In [8]:
!7z x yoochoose-data.7z


7-Zip [64] 15.09 beta : Copyright (c) 1999-2015 Igor Pavlov : 2015-10-16
p7zip Version 15.09 beta (locale=utf8,Utf16=on,HugeFiles=on,64 bits,8 CPUs x64)

Scanning the drive for archives:
  0M Scan         1 file, 287211932 bytes (274 MiB)

Extracting archive: yoochoose-data.7z
--
Path = yoochoose-data.7z
Type = 7z
Physical Size = 287211932
Headers Size = 255
Method = LZMA:24
Solid = +
Blocks = 2

  0%      0% - yoochoose-buys.dat                           1% - yoochoose-buys.dat                           2% - yoochoose-buys.dat                           3% 1 - yoochoose-clicks.dat                               4% 1 - yoochoose-clicks.dat                               5% 1 - yoochoose-clicks.dat

### Put data to HDFS

In [1]:
!hdfs dfs -mkdir -p yoochoose/
!hdfs dfs -put yoochoose-*.dat yoochoose/

16/04/10 01:32:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/04/10 01:32:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `yoochoose/yoochoose-buys.dat': File exists
put: `yoochoose/yoochoose-clicks.dat': File exists
put: `yoochoose/yoochoose-test.dat': File exists


rm: yoochoose-*: No such file or directory
rm: dataset-README.txt: No such file or directory


### Aggregate sessions

In [2]:
import datetime
import operator

def parse_datetime(dt_str):
    return datetime.datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S.%fZ')

def parse_clicks(line):
    parts = line.split(',')
    session_id = int(parts[0])
    timestamp, item_id, category = parse_datetime(parts[1]), parts[2], parts[3]
    return session_id, (timestamp, item_id, category)

def parse_buys(line):
    parts = line.split(',')
    session_id = int(parts[0])
    timestamp, item_id, price, quantity = parse_datetime(parts[1]), parts[2], float(parts[3]), int(parts[4])
    return session_id, (timestamp, item_id, price, quantity)

def sort_sessions((session_id, (clicks, buys))):
    clicks = sorted(clicks, key=operator.itemgetter(0)) if clicks is not None else []
    buys = sorted(buys, key=operator.itemgetter(0)) if buys is not None else []
    return session_id, (clicks, buys)

#### Run Spark job

In [12]:
#Setup Spark Context. 
CLUSTER_URL = 'spark://localhost:7077'
from pyspark import  SparkContext
sc = SparkContext( CLUSTER_URL, 'pyspark')
print sc

<pyspark.context.SparkContext object at 0x110c70890>


In [13]:
# read input
clicks = sc.textFile('yoochoose-clicks.dat', 40).map(parse_clicks).groupByKey()
print 'Finished Computing Clicks'
clicksTest = sc.textFile('yoochoose-test.dat', 40).map(parse_clicks).groupByKey()
print 'Finished computing clicksTest'
buys = sc.textFile('yoochoose-buys.dat', 40).map(parse_buys).groupByKey()
print 'Finished computing buys'


Finished COmputing Clicks
Finished computing clicksTest
Finished computing buys


In [14]:
#Aggregate Sessions . 
train_sessions = clicks.fullOuterJoin(buys).map(sort_sessions)
print 'Finished computing train sessions'
test_sessions = clicksTest.map(lambda (session_id, clicks): (session_id, (clicks, None))).map(sort_sessions)
print 'Finished Computing test sessions'

# screate session files
train_sessions.saveAsPickleFile('train_sessions.pickle')
test_sessions.saveAsPickleFile('test_sessions.pickle')
print 'Start Persisting as Pickle Files'

sc.stop()

Finished computing train sessions
Finished Computing test sessions


Py4JJavaError: An error occurred while calling o227.saveAsObjectFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/senthilsrinivasan/unicorn/dev/ydf-recsys2015-challenge/notebooks/train_sessions.pickle already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply$mcV$sp(SequenceFileRDDFunctions.scala:105)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply(SequenceFileRDDFunctions.scala:90)
	at org.apache.spark.rdd.SequenceFileRDDFunctions$$anonfun$saveAsSequenceFile$1.apply(SequenceFileRDDFunctions.scala:90)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.SequenceFileRDDFunctions.saveAsSequenceFile(SequenceFileRDDFunctions.scala:90)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply$mcV$sp(RDD.scala:1457)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply(RDD.scala:1457)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsObjectFile$1.apply(RDD.scala:1457)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.saveAsObjectFile(RDD.scala:1454)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsObjectFile(JavaRDDLike.scala:537)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsObjectFile(JavaRDDLike.scala:47)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
