### Let's first configure the Spark cluster
Add the jars for H2O sparkling water and the spark-csv. Also change the driver Memory to the size of the VM of the clusters.
<br><b>Make sure that:<br> </b>
1. You are adding the right Sparkling Water version 
2. Set the driver and executor memory to  80% of the RAM of the worker VM types selected on cluster creation
>The default worker VM type is D13_v2 (56G RAM, 8 cores), so 80% of RAM = 45G<br>
>For information on all available VM sizes click [here](https://azure.microsoft.com/en-us/documentation/articles/virtual-machines-windows-sizes/#dv2-series) 
3. Set numExecutors to the amount of workers created in the clusters.
>The default number of worker nodes is 3<br>

In [1]:
%%configure -f
{
    "conf":{
        "spark.jars.packages":"ai.h2o:sparkling-water-core_2.10:1.6.8,com.databricks:spark-csv_2.10:1.5.0",
        "spark.locality.wait":"3000",
        "spark.scheduler.minRegisteredResourcesRatio":"1",
        "spark.task.maxFailures":"1",
        "spark.yarn.am.extraJavaOption":"-XX:MaxPermSize=384m",
        "spark.yarn.max.executor.failures":"1",
        "maximizeResourceAllocation": "true"
    },
    "driverMemory":"45G",
    "executorMemory":"45G",
    "numExecutors":3
}

Here we add the pySpark egg file from the downloaded H2O Sparkling water distribution.
<br><b> Make sure that file name of the egg file below matches the downloaded distribution of Sparkling Water</b>

In [2]:
import os
os.environ["PYTHON_EGG_CACHE"] = "~/"

sc.addPyFile('wasb:///H2O-Sparkling-Water/py/dist/h2o_pysparkling_1.6-1.6.8-py2.7.egg')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1479023752017_0008,pyspark,idle,Link,Link,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.


### Now the coding starts..

In [3]:
#Initiate H2OContext on top of Spark
#Note that the "H2O cluster total nodes has is N-1 worker nodes. This is because the spark deploy mode is yarn-cluster
from pysparkling import *
hc = H2OContext.getOrCreate(sc)

Checking whether there is an H2O instance running at http://192.168.0.7:54321. connected.
--------------------------  -------------------------------
H2O cluster uptime:         09 secs
H2O cluster version:        3.10.0.7
H2O cluster version age:    1 month and 24 days
H2O cluster name:           sparkling-water-yarn_-607396080
H2O cluster total nodes:    2
H2O cluster free memory:    86.2 Gb
H2O cluster total cores:    16
H2O cluster allowed cores:  16
H2O cluster status:         accepting new members, healthy
H2O connection url:         http://192.168.0.7:54321
H2O connection proxy:
Python version:             2.7.12 final
--------------------------  -------------------------------

In [4]:
# This is just helper function returning relative path to data files within sparkling-water project directories
def _locate(example_name): 
    return "wasb:///H2O-Sparkling-Water/examples/smalldata/" + example_name 


# Define file names
chicagoAllWeather = "chicagoAllWeather.csv"
chicagoCensus = "chicagoCensus.csv"
chicagoCrimes10k = "chicagoCrimes10k.csv"

# Add files to Spark Cluster
sc.addFile(_locate(chicagoAllWeather))
sc.addFile(_locate(chicagoCensus))
sc.addFile(_locate(chicagoCrimes10k))

# And import them into H2O
from pyspark import SparkFiles
import h2o
# Since we have already loaded files into spark, we have to use h2o.upload_file instead of 
# h2o.import_file since h2o.import_file expects cluster-relative path (ie. the file on this
# path can be accessed from all the machines on the cluster) but SparkFiles.get(..) already
# give us relative path to the file on a current node which h2o.upload_file can handle ( it
# uploads file located on current node and distributes it to the H2O cluster)
f_weather = h2o.upload_file(SparkFiles.get(chicagoAllWeather))
f_census = h2o.upload_file(SparkFiles.get(chicagoCensus))
f_crimes = h2o.upload_file(SparkFiles.get(chicagoCrimes10k), col_types = {"Date": "string"})

Parse progress: [#############################################################################] 100%
Parse progress: [#############################################################################] 100%
Parse progress: [#############################################################################] 100%

In [5]:
f_weather.show()
f_census.show()
f_crimes.show()

       date    month    day    year    maxTemp    meanTemp    minTemp
-----------  -------  -----  ------  ---------  ----------  ---------
9.78307e+11        1      1    2001         23          14          6
9.78394e+11        1      2    2001         18          12          6
9.7848e+11         1      3    2001         28          18          8
9.78566e+11        1      4    2001         30          24         19
9.78653e+11        1      5    2001         36          30         21
9.78739e+11        1      6    2001         33          26         19
9.78826e+11        1      7    2001         34          28         21
9.78912e+11        1      8    2001         26          20         14
9.78998e+11        1      9    2001         23          16         10
9.79085e+11        1     10    2001         34          26         19

[5162 rows x 7 columns]
  Community Area Number  COMMUNITY AREA NAME      PERCENT OF HOUSING CROWDED    PERCENT HOUSEHOLDS BELOW POVERTY    PERCENT AGED 16+ UN

In [6]:
# Set time zone to UTC for date manipulation
h2o.set_timezone("Etc/UTC")

In [7]:
# Transform weather table
## Remove 1st column (date)
f_weather = f_weather[1:]

In [8]:
# Transform census table
## Remove all spaces from column names (causing problems in Spark SQL)
col_names = map(lambda s: s.strip().replace(' ', '_').replace('+','_'), f_census.col_names)

## Update column names in the table
#f_weather.names = col_names
f_census.names = col_names

In [9]:
# Transform crimes table

## Drop useless columns
f_crimes = f_crimes[2:]

## Replace ' ' by '_' in column names
col_names = map(lambda s: s.replace(' ', '_'), f_crimes.col_names)
f_crimes.names = col_names

## Refine date column
def refine_date_col(data, col, pattern):
    data[col]         = data[col].as_date(pattern)
    data["Day"]       = data[col].day()
    data["Month"]     = data[col].month()
    data["Year"]      = data[col].year()
    data["WeekNum"]   = data[col].week()
    data["WeekDay"]   = data[col].dayOfWeek()
    data["HourOfDay"] = data[col].hour()
    
    data.describe() # HACK: Force evaluation before ifelse and cut. See PUBDEV-1425.
        
    # Create weekend and season cols
    data["Weekend"] = ((data["WeekDay"] == "Sun") | (data["WeekDay"] == "Sat"))
    data["Season"] = data["Month"].cut([0, 2, 5, 7, 10, 12], ["Winter", "Spring", "Summer", "Autumn", "Winter"])
    
refine_date_col(f_crimes, "Date", "%m/%d/%Y %I:%M:%S %p")
f_crimes = f_crimes.drop("Date")
f_crimes.describe()

Rows:9999
Cols:25


         Date               Block                 IUCR           Primary_Type     Description                   Location_Description    Arrest          Domestic        Beat           District       Ward           Community_Area    FBI_Code       X_Coordinate    Y_Coordinate    Year    Updated_On         Latitude         Longitude        Location                       Day            Month           WeekNum         WeekDay    HourOfDay
-------  -----------------  --------------------  -------------  ---------------  ----------------------------  ----------------------  --------------  --------------  -------------  -------------  -------------  ----------------  -------------  --------------  --------------  ------  -----------------  ---------------  ---------------  -----------------------------  -------------  --------------  --------------  ---------  -------------
type     int                enum                  int            enum             enum              

In [10]:
# Expose H2O frames as Spark DataFrame

df_weather = hc.as_spark_frame(f_weather)
df_census = hc.as_spark_frame(f_census)
df_crimes = hc.as_spark_frame(f_crimes)

In [11]:
df_weather.show()

+-----+---+----+-------+--------+-------+
|month|day|year|maxTemp|meanTemp|minTemp|
+-----+---+----+-------+--------+-------+
|    1|  1|2001|     23|      14|      6|
|    1|  2|2001|     18|      12|      6|
|    1|  3|2001|     28|      18|      8|
|    1|  4|2001|     30|      24|     19|
|    1|  5|2001|     36|      30|     21|
|    1|  6|2001|     33|      26|     19|
|    1|  7|2001|     34|      28|     21|
|    1|  8|2001|     26|      20|     14|
|    1|  9|2001|     23|      16|     10|
|    1| 10|2001|     34|      26|     19|
|    1| 11|2001|     39|      28|     18|
|    1| 12|2001|     37|      31|     25|
|    1| 13|2001|     35|      34|     33|
|    1| 14|2001|     36|      34|     32|
|    1| 15|2001|     35|      32|     30|
|    1| 16|2001|     30|      28|     26|
|    1| 17|2001|     26|      22|     19|
|    1| 18|2001|     30|      24|     19|
|    1| 19|2001|     27|      22|     17|
|    1| 20|2001|     24|      18|     10|
+-----+---+----+-------+--------+-

In [12]:
# Use Spark SQL to join datasets

## Register DataFrames as tables in SQL context
sqlContext.registerDataFrameAsTable(df_weather, "chicagoWeather")
sqlContext.registerDataFrameAsTable(df_census, "chicagoCensus")
sqlContext.registerDataFrameAsTable(df_crimes, "chicagoCrime")


crimeWithWeather = sqlContext.sql("""SELECT
a.Year, a.Month, a.Day, a.WeekNum, a.HourOfDay, a.Weekend, a.Season, a.WeekDay,
a.IUCR, a.Primary_Type, a.Location_Description, a.Community_Area, a.District,
a.Arrest, a.Domestic, a.Beat, a.Ward, a.FBI_Code,
b.minTemp, b.maxTemp, b.meanTemp,
c.PERCENT_AGED_UNDER_18_OR_OVER_64, c.PER_CAPITA_INCOME, c.HARDSHIP_INDEX,
c.PERCENT_OF_HOUSING_CROWDED, c.PERCENT_HOUSEHOLDS_BELOW_POVERTY,
c.PERCENT_AGED_16__UNEMPLOYED, c.PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA
FROM chicagoCrime a
JOIN chicagoWeather b
ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day
JOIN chicagoCensus c
ON a.Community_Area = c.Community_Area_Number""")

In [13]:
crimeWithWeather.show()

+----+-----+---+-------+---------+-------+------+-------+----+--------------------+--------------------+--------------+--------+------+--------+----+----+--------+-------+-------+--------+--------------------------------+-----------------+--------------+--------------------------+--------------------------------+---------------------------+--------------------------------------------+
|Year|Month|Day|WeekNum|HourOfDay|Weekend|Season|WeekDay|IUCR|        Primary_Type|Location_Description|Community_Area|District|Arrest|Domestic|Beat|Ward|FBI_Code|minTemp|maxTemp|meanTemp|PERCENT_AGED_UNDER_18_OR_OVER_64|PER_CAPITA_INCOME|HARDSHIP_INDEX|PERCENT_OF_HOUSING_CROWDED|PERCENT_HOUSEHOLDS_BELOW_POVERTY|PERCENT_AGED_16__UNEMPLOYED|PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA|
+----+-----+---+-------+---------+-------+------+-------+----+--------------------+--------------------+--------------+--------+------+--------+----+----+--------+-------+-------+--------+--------------------------------+---

In [14]:
# Publish Spark DataFrame as H2OFrame with given name
crimeWithWeatherHF = hc.as_h2o_frame(crimeWithWeather, "crimeWithWeatherTable")

In [15]:
# Transform selected String columns to categoricals
crimeWithWeatherHF["Arrest"] = crimeWithWeatherHF["Arrest"].asfactor()
crimeWithWeatherHF["Season"] = crimeWithWeatherHF["Season"].asfactor()
crimeWithWeatherHF["WeekDay"] = crimeWithWeatherHF["WeekDay"].asfactor()
crimeWithWeatherHF["Primary_Type"] = crimeWithWeatherHF["Primary_Type"].asfactor()
crimeWithWeatherHF["Location_Description"] = crimeWithWeatherHF["Location_Description"].asfactor()
crimeWithWeatherHF["Domestic"] = crimeWithWeatherHF["Domestic"].asfactor()

In [16]:
# Split frame into two - we use one as the training frame and the second one as the validation frame
splits = crimeWithWeatherHF.split_frame(ratios=[0.8])
train = splits[0]
test = splits[1]

# Prepare column names
predictor_columns = train.drop("Arrest").col_names
response_column = "Arrest"

In [17]:
# Create and train GBM model
from h2o.estimators.gbm import H2OGradientBoostingEstimator

# Prepare model based on the given set of parameters
gbm_model = H2OGradientBoostingEstimator(  ntrees       = 50,
                                     max_depth    = 3,
                                     learn_rate   = 0.1,
                                     distribution = "bernoulli"
                                 )

# Train the model
gbm_model.train(x            = predictor_columns,
            y                = response_column,
            training_frame   = train,
            validation_frame = test
         )

gbm Model Build progress: [###################################################################] 100%

In [18]:
# Show GBM model performance
gbm_model.model_performance(test)


ModelMetricsBinomial: gbm
** Reported on test data. **

MSE: 0.0923998967481
RMSE: 0.303973513235
LogLoss: 0.306765195427
Mean Per-Class Error: 0.170682684378
AUC: 0.911157579336
Gini: 0.822315158672
Confusion Matrix (Act/Pred) for max f1 @ threshold = 0.487593306922: 
       false    true    Error    Rate
-----  -------  ------  -------  --------------
false  1395     38      0.0265   (38.0/1433.0)
true   197      375     0.3444   (197.0/572.0)
Total  1592     413     0.1172   (235.0/2005.0)
Maximum Metrics: Maximum metrics at their respective thresholds

metric                       threshold    value     idx
---------------------------  -----------  --------  -----
max f1                       0.487593     0.761421  120
max f2                       0.168189     0.797277  275
max f0point5                 0.649991     0.849421  97
max accuracy                 0.529157     0.883292  113
max precision                0.980588     1         0
max recall                   0.0407877    1  

In [19]:
# Create and train deeplearning model
from h2o.estimators.deeplearning import H2ODeepLearningEstimator

# Prepare model based on the given set of parameters
dl_model = H2ODeepLearningEstimator()

# Train the model
dl_model.train(x            = predictor_columns,
            y                = response_column,
            training_frame   = train,
            validation_frame = test
            )

deeplearning Model Build progress: [##########################################################] 100%

In [20]:
# Show deeplearning model performance
dl_model.model_performance(test)


ModelMetricsBinomial: deeplearning
** Reported on test data. **

MSE: 0.109720903009
RMSE: 0.331241457261
LogLoss: 0.349334813477
Mean Per-Class Error: 0.189773764268
AUC: 0.891829210566
Gini: 0.783658421132
Confusion Matrix (Act/Pred) for max f1 @ threshold = 0.573329228933: 
       false    true    Error    Rate
-----  -------  ------  -------  --------------
false  1350     83      0.0579   (83.0/1433.0)
true   188      384     0.3287   (188.0/572.0)
Total  1538     467     0.1352   (271.0/2005.0)
Maximum Metrics: Maximum metrics at their respective thresholds

metric                       threshold    value     idx
---------------------------  -----------  --------  -----
max f1                       0.573329     0.739172  125
max f2                       0.137435     0.775635  306
max f0point5                 0.803973     0.821281  62
max accuracy                 0.709667     0.868329  85
max precision                0.999862     1         0
max recall                   0.0135966

In [21]:
# Create crime class which is used as a data holder on which prediction is done
from datetime import datetime
from pytz import timezone
from pyspark.sql import Row

def get_season(dt):
    if (dt >= 3 and dt <= 5):
        return "Spring"
    elif (dt >= 6 and dt <= 8):
        return "Summer"
    elif (dt >= 9 and dt <= 10):
        return "Autumn"
    else:       
        return "Winter"
    
def crime(date,
        iucr,
        primaryType,
        locationDescr,
        domestic,
        beat,
        district,
        ward,
        communityArea,
        fbiCode,
        minTemp = 77777,
        maxTemp = 77777,
        meanTemp = 77777,
        datePattern = "%d/%m/%Y %I:%M:%S %p",
        dateTimeZone = "Etc/UTC"):

    dt = datetime.strptime("02/08/2015 11:43:58 PM",'%d/%m/%Y %I:%M:%S %p')
    dt.replace(tzinfo=timezone("Etc/UTC"))

    crime = Row(
        Year = dt.year,
        Month = dt.month,
        Day = dt.day,
        WeekNum = dt.isocalendar()[1],
        HourOfDay = dt.hour,
        Weekend = 1 if dt.weekday() == 5 or dt.weekday() == 6 else 0,
        Season = get_season(dt.month),
        WeekDay = dt.strftime('%a'),  #gets the day of week in short format - Mon, Tue ...
        IUCR = iucr,
        Primary_Type = primaryType,
        Location_Description = locationDescr,
        Domestic = True if domestic else False,
        Beat = beat,
        District = district,
        Ward = ward,
        Community_Area = communityArea,
        FBI_Code = fbiCode,
        minTemp = minTemp,
        maxTemp = maxTemp,
        meanTemp = meanTemp
    )
    return crime

In [22]:
# Create crime examples
crime_examples = [
  crime("02/08/2015 11:43:58 PM", 1811, "NARCOTICS", "STREET",False, 422, 4, 7, 46, 18),
  crime("02/08/2015 11:00:39 PM", 1150, "DECEPTIVE PRACTICE", "RESIDENCE",False, 923, 9, 14, 63, 11)]

In [23]:
# For given crime and model return probability of crime.
def score_event(crime, model, censusTable):
    rdd = sc.parallelize([crime])
    crime_frame = sqlContext.createDataFrame(rdd)
    # Join table with census data
    df_row = censusTable.join(crime_frame).where("Community_Area = Community_Area_Number")  
    row = hc.as_h2o_frame(df_row)
    row["Season"] = row["Season"].asfactor()
    row["WeekDay"] = row["WeekDay"].asfactor()
    row["Primary_Type"] = row["Primary_Type"].asfactor()
    row["Location_Description"] = row["Location_Description"].asfactor()
    row["Domestic"] = row["Domestic"].asfactor()

    predictTable = model.predict(row)
    probOfArrest = predictTable["true"][0,0]
    return probOfArrest

for crime in crime_examples:
    arrestProbGBM = 100*score_event(crime, gbm_model, df_census)
    arrestProbDLM = 100*score_event(crime, dl_model, df_census)

    print("""
       |Crime: """+str(crime)+"""
       |  Probability of arrest best on DeepLearning: """+str(arrestProbDLM)+"""
       |  Probability of arrest best on GBM: """+str(arrestProbGBM)+"""
        """)

gbm prediction progress: [####################################################################] 100%
deeplearning prediction progress: [###########################################################] 100%

       |Crime: Row(Beat=422, Community_Area=46, Day=2, District=4, Domestic=False, FBI_Code=18, HourOfDay=23, IUCR=1811, Location_Description='STREET', Month=8, Primary_Type='NARCOTICS', Season='Summer', Ward=7, WeekDay='Sun', WeekNum=31, Weekend=1, Year=2015, maxTemp=77777, meanTemp=77777, minTemp=77777)
       |  Probability of arrest best on DeepLearning: 100.0
       |  Probability of arrest best on GBM: 95.8623672438
        
gbm prediction progress: [####################################################################] 100%
deeplearning prediction progress: [###########################################################] 100%

       |Crime: Row(Beat=923, Community_Area=63, Day=2, District=9, Domestic=False, FBI_Code=11, HourOfDay=23, IUCR=1150, Location_Description='RESIDENCE', Mont