### Import PySpark
Nogle få kommentarer:
 - Download spark pre-built for hadoop 2.6, jeg vil også anbefale jer at bruge spark 1.6.0 da der er nogle problemer med 1.6.1 [hent den her http://www.apache.org/dyn/closer.lua/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz]
 - husk at ændre paths i denne notebook
 - `os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages com.databricks:spark-avro_2.10:2.0.1 pyspark-shell"` vil give jer mulighed for direkte at loade avro filer

In [None]:
import sys
import os
import os.path

SPARK_HOME = """spark-1.6.0-bin-hadoop2.6/""" ## PATH TO SPARK

sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "py4j-0.9-src.zip"))
sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "pyspark.zip"))
os.environ["SPARK_HOME"] = SPARK_HOME
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages com.databricks:spark-avro_2.10:2.0.1 pyspark-shell"
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"


from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ArrayType

conf = (SparkConf()
         .setMaster("local[*]")
         .setAppName("My app"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

In [None]:
# load data to dataframe
df = (sqlContext.read.format("com.databricks.spark.avro")
      .load("data/201512/*.avro")
      )

In [None]:
# I kan se strukturen på data her
df.printSchema()

In [None]:
# kig på data
df.show(10)

In [None]:
data = df

In [None]:
data.count()

In [None]:
from dateutil import parser
import math

grid_boundaries_tuple=(-180, 180, -90, 90)
spatial_resolution_decimals = 3

GRID_MIN_LNG = (grid_boundaries_tuple[0] + 180) * pow(10,spatial_resolution_decimals)
GRID_MAX_LNG = (grid_boundaries_tuple[1] + 180) * pow(10,spatial_resolution_decimals)
GRID_MIN_LAT = (grid_boundaries_tuple[2] + 90) * pow(10,spatial_resolution_decimals)
GRID_MAX_LAT = (grid_boundaries_tuple[3] + 90) * pow(10,spatial_resolution_decimals)

def calculate_spatial_bin(lng, lat):
    #lng = lst[0]
    #lat = lst[1]
    lat += 90.0
    lng += 180.0
    lat = math.trunc(lat*pow(10, spatial_resolution_decimals))
    lng = math.trunc(lng*pow(10, spatial_resolution_decimals))
    return (abs(GRID_MAX_LAT - GRID_MIN_LAT) *
            (lat-GRID_MIN_LAT)) + (lng-GRID_MIN_LNG)

def calculate_time_bins(start_time, end_time=None):
    start_time = parser.parse(start_time)
    min_datetime = parser.parse('2015-08-09 00:00:00+02')
    start_bin = math.floor(
        ((start_time-min_datetime).total_seconds()/60.0)/60)

    if end_time:
        end_time = parser.parse(end_time)
        end_bin = math.ceil(((end_time-min_datetime).total_seconds()/60.0)/60)
    else:
        end_bin = start_bin

    if start_bin == end_bin:
        return [start_bin]
    else:
        return list(range(start_bin, end_bin))

first_period_min_date = parser.parse("2015-12-01 00:00:00+00:00")
first_period_max_date = parser.parse("2015-12-09 23:59:59+00:00")
second_period_min_date = parser.parse("2015-12-10 00:00:00+00:00")
second_period_max_date = parser.parse("2015-12-19 23:59:59+00:00")
third_period_min_date = parser.parse("2015-12-20 00:00:00+00:00")
third_period_max_date = parser.parse("2015-12-29 23:59:59+00:00")

In [None]:
#filter by country (Sweden) and start_time and end_time
swe_data = data.filter(data["country"] == 'Sweden').filter(data["start_time"] >= first_period_min_date).filter(data["end_time"] <= third_period_max_date)
swe_data.show(1)

In [None]:
swe_data.printSchema()

In [None]:
#DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType
udf_spatial_bin = udf(calculate_spatial_bin, IntegerType())
binned_swe_data = swe_data.withColumn("spatial_bin", udf_spatial_bin("longitude", "latitude"))
udf_time_bins = udf(calculate_time_bins, ArrayType(IntegerType()))
binned_swe_data = binned_swe_data.withColumn("time_bins", udf_time_bins("start_time", "end_time"))

In [None]:
binned_swe_data.select(binned_swe_data['start_time'], binned_swe_data['end_time'], binned_swe_data['time_bins']).sort('start_time').show(1)

In [None]:
import pyspark.sql.functions
# divide into periods
first_period_min_bin = calculate_time_bins("2015-12-01 00:00:00+00:00")[0]
first_period_max_bin = calculate_time_bins("2015-12-09 23:59:59+00:00")[0]
second_period_min_bin = calculate_time_bins("2015-12-10 00:00:00+00:00")[0]
second_period_max_bin = calculate_time_bins("2015-12-19 23:59:59+00:00")[0]
third_period_min_bin = calculate_time_bins("2015-12-20 00:00:00+00:00")[0]
third_period_max_bin = calculate_time_bins("2015-12-31 23:59:59+00:00")[0]


period_1_locations = binned_swe_data.filter(binned_swe_data["start_time"] >= first_period_min_date).filter(binned_swe_data["end_time"] < first_period_max_date)
period_2_locations = binned_swe_data.filter(binned_swe_data["start_time"] >= second_period_min_date).filter(binned_swe_data["end_time"] < second_period_max_date)
period_3_locations = binned_swe_data.filter(binned_swe_data["start_time"] >= third_period_min_date).filter(binned_swe_data["end_time"] < third_period_max_date)

In [None]:
# remove duplicates and create (users) -> [(spatial_bin, time_bin)]
distinct = period_1_locations.select(period_1_locations.useruuid, period_1_locations.spatial_bin, pyspark.sql.functions.explode(period_1_locations.time_bins)).distinct()
distinct.show(5,truncate=False)

In [None]:
# generate cooccurrences in form of: (user1,user2) -> [(spatial,time)]
from itertools import combinations
def generate_cooccurrences(row):
    return [(tuple(sorted(pair)),[row[0]]) for pair in combinations(row[1], 2)]
    
coocs_1 = period_1_bins_to_users.flatMap(generate_cooccurrences).reduceByKey(lambda a,b: a+b)
coocs_2 = period_2_bins_to_users.flatMap(generate_cooccurrences).reduceByKey(lambda a,b: a+b)
coocs_3 = period_3_bins_to_users.flatMap(generate_cooccurrences).reduceByKey(lambda a,b: a+b)

In [None]:
coocs_1.take(1)

In [None]:
import numpy as np
# generate location entropies (H_l) for use in weighted frequency in form key:spatial_bin, val: H_l
def calculate_H(row):
    H_val = 0
    for user in set(row[1]):
        P_ul = row[1].count(user)/len(row[1])
        H_val += P_ul*np.log2(P_ul)
    return row[0],-H_val

period_1_h_vals = period_1_bins_to_users.map(lambda row: (row[0][0],row[1])).reduceByKey(lambda a, b: a+b).map(calculate_H).collectAsMap()
period_2_h_vals = period_2_bins_to_users.map(lambda row: (row[0][0],row[1])).reduceByKey(lambda a, b: a+b).map(calculate_H).collectAsMap()

In [None]:
from pyspark.mllib.regression import LabeledPoint

import numpy as np

y_1_users = coocs_2.map(lambda row: row[0]).collect()
y_2_users = coocs_3.map(lambda row: row[0]).collect()

def compute_weighted_frequency(row, h_vals):
    spatial_bins = [r[0] for r in row[1]]
    wf_value = 0
    for sb in set(spatial_bins):
        wf_value += spatial_bins.count(sb)*np.exp(-h_vals[sb])
    return wf_value

def compute_features(y, row, h_vals):
    # number of cooccurrences
    num_coocs = len(row[1])
    # number of unique (by spatial bin) cooccurrences
    num_unique_coocs = len(set([r[0] for r in row[1]]))
    spatial_bins = [r[0] for r in row[1]]
    # weighted frequency
    weighted_frequency = compute_weighted_frequency(row, h_vals)
    # diversity
    diversity = -np.sum([spatial_bins.count(sb)/len(spatial_bins)*np.log2(spatial_bins.count(sb)/len(spatial_bins)) for sb in set(spatial_bins)])
    return LabeledPoint(y, [num_coocs, num_unique_coocs, diversity, weighted_frequency])

def compute_train_features(row):
    y = 1 if row[0] in y_1_users else 0
    return compute_features(y, row, period_1_h_vals)
def compute_test_features(row):
    y = 1 if row[0] in y_2_users else 0
    return compute_features(y, row, period_2_h_vals)

X_train = coocs_1.map(compute_train_features)

X_test = coocs_2.map(compute_test_features)

In [None]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

# Train model and compute AUC
model = RandomForest.trainClassifier(X_train, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=100, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)



predictions = model.predict(X_test.map(lambda x: x.features))
#print(predictions.count())

labels = X_test.map(lambda x: x.label)
#print(labels.count())

predictionAndLabels = predictions.zip(labels)
#print(predictionAndLabels.count())

metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = {}".format(metrics.areaUnderROC))