# Background

In the past, my only experience with Spark was through Databricks in the 'Introduction to Spark' [EdX class](https://www.edx.org/course/introduction-apache-spark-uc-berkeleyx-cs105x). From my experience there, I found that the databricks environment was similar to that of Jupyter notebooks, and the syntax and operations of pyspark <strong>dataframes</strong>  were similar to what I'd use in SQL and Pandas. Thus, I decided that it was a good time to test it out locally. In this walkthrough, I hope to be able to load some Big Data (22 gigs) and see if I can perform some standard operations, and perhaps even some simple machine learning using MLLib.

## Loading Data

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pandas as pd
import warnings
import seaborn as sns
warnings.filterwarnings('ignore')

I moved my 22 gig csv file (courtesy of [Kaggle](https://www.kaggle.com/c/acquire-valued-shoppers-challenge/data?transactions.csv.gz) into my spark folder. Recall that in order to read a csv file with spark, you need to call it with a flag from terminal (even for notebook). In this case, it would be the following:

bin/pyspark --packages com.databricks:spark-csv_2.10:1.5.0

In [2]:
#Let's doublecheck that SparkContext is loaded on our machine
sc

<pyspark.context.SparkContext at 0x1099879d0>

In addition to using the flag from terminal, I also have to specifiy the databricks package from the options function in order to read the csv file properly into a pyspark dataframe.

In [3]:
data= sqlContext.read.format('com.databricks.spark.csv').options(
    header='true', inferschema='true').load('kaggle_shoppers_transactions.csv')

Sweet! It took about 5 min to 'load', but I was able to get it in there :) If you're not familiar with Spark, I guess I should point out that this 22 gig file would not be able to load into memory on this computer (16 gig of RAM). Thus, since pandas needs to load everything at once to process all of the data, it is not a viable option. Although there are ways to perform operations on tables chunks at a time with Pandas and python manually, Spark can do this for us. This is where it excels.

Instead of fetching data from disk, processing it in memory, then writing items back to disk everytime to process all the data, Spark tries to do most of its work straight from memory to increases speed of execution. If the data doesn't fit into memory, it will spill the remaining data into disk, managing its resources automatically :)

Let's check that the object is a dataframe and that we're getting the table we expect.

In [4]:
type(data)

pyspark.sql.dataframe.DataFrame

In [5]:
data.show(5)

+-----+-----+----+--------+----------+-----+----------+-----------+--------------+----------------+--------------+
|   id|chain|dept|category|   company|brand|      date|productsize|productmeasure|purchasequantity|purchaseamount|
+-----+-----+----+--------+----------+-----+----------+-----------+--------------+----------------+--------------+
|86246|  205|   7|     707|1078778070|12564|2012-03-02|       12.0|            OZ|               1|          7.59|
|86246|  205|  63|    6319| 107654575|17876|2012-03-02|       64.0|            OZ|               1|          1.59|
|86246|  205|  97|    9753|1022027929|    0|2012-03-02|        1.0|            CT|               1|          5.99|
|86246|  205|  25|    2509| 107996777|31373|2012-03-02|       16.0|            OZ|               1|          1.99|
|86246|  205|  55|    5555| 107684070|32094|2012-03-02|       16.0|            OZ|               2|         10.38|
+-----+-----+----+--------+----------+-----+----------+-----------+-------------

## Data Exploration

Here I'll explore the data and see if I can find anything interesting. Let's start by finding the number of distinct companies we have at our disposal and how far back in time our measurements go.

In [8]:
#Let's sample a bit of our data  
dataSample = data.sample(withReplacement = False, fraction= 0.05, seed= 20)
dataSample.show(5)

+-----+-----+----+--------+---------+-----+----------+-----------+--------------+----------------+--------------+
|   id|chain|dept|category|  company|brand|      date|productsize|productmeasure|purchasequantity|purchaseamount|
+-----+-----+----+--------+---------+-----+----------+-----------+--------------+----------------+--------------+
|86246|  205|   8|     814|102840020|18584|2012-03-02|       15.5|            OZ|               1|          3.29|
|86246|  205|   8|     815|103900030|13296|2012-03-02|        8.0|            OZ|               1|          1.89|
|86246|  205|  23|    2301|101116616|15266|2012-03-02|       16.0|            OZ|               2|          2.59|
|86246|  205|  32|    3202|101116616|15266|2012-03-02|       24.0|            OZ|               1|          3.19|
|86246|  205|  34|    3410|107418272|16853|2012-03-02|        7.5|            OZ|               1|          2.19|
+-----+-----+----+--------+---------+-----+----------+-----------+--------------+-------

In [14]:
#Now let's get descriptions about the dataframe columns and
#dataframe data types like we would in pandas
print dataSample.columns
print dataSample.printSchema()

['id', 'chain', 'dept', 'category', 'company', 'brand', 'date', 'productsize', 'productmeasure', 'purchasequantity', 'purchaseamount']
root
 |-- id: long (nullable = true)
 |-- chain: integer (nullable = true)
 |-- dept: integer (nullable = true)
 |-- category: integer (nullable = true)
 |-- company: long (nullable = true)
 |-- brand: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- productsize: double (nullable = true)
 |-- productmeasure: string (nullable = true)
 |-- purchasequantity: integer (nullable = true)
 |-- purchaseamount: double (nullable = true)

None


In [15]:
#Now let's count the number of items in our dataframe
dataSample.count()

17476413

That took a long time (with good reason). Let's redefine this to sample an even smaller sample of our data. Instead of 5% let's try 0.01%.

In [17]:
#New sampling
dataSample = data.sample(withReplacement = False, fraction= 0.0001, seed= 20)

Now that we have a much smaller sample, hopefully some of our operations will be faster to run. We won't have the exact values for our operations since we're only using samples, but they should be good estimates for certain parameters. Let's start by <strong>caching</strong> our sample since we know we'll be using it a lot (data will be accessed a lot more quickly in the future).

In [19]:
#Cache data sample
dataSample.cache()

DataFrame[id: bigint, chain: int, dept: int, category: int, company: bigint, brand: int, date: string, productsize: double, productmeasure: string, purchasequantity: int, purchaseamount: double]

In [None]:
dataSample.count()

Now let's look at the distribution of purchase amounts. In order to use the plotting libraries in python, we'll first need to collect the data from our 'purchaseamount' column and put it either in a pandas data series or into a list. I'll choose data series.

In [None]:
Even this took a little too long. I've read that I can speed things up by using a fewer '

In [22]:
data.take()#Austin said this was the best way to work with subsets of data
#plt.hist(data.select('purchaseamount').take())
#Create a new pandas data series
#purchAmounts= data.select('purchaseamount').take()
#*How to set # of executors, start spark job with local (google this).
#*sc.parallelize() #takes data and cuts it up into pieces
#*each executor/slave/core will work on each piece one at a time 
#*If parallelize doesn't work, you can take 
#*sc.textfile and sc.binaryfiles and sc.wholetextfiles 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server
Traceback (most recent call last):
  File "/Users/diego/desktop/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 690, in start
    self.socket.connect((self.address, self.port))
  File "//anaconda/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server