In [1]:
from pyspark.sql import SparkSession, Row, Column
from pyspark.sql.functions import sqrt
from collections import OrderedDict

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [2]:
transactions = spark.read.option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").csv("transaction_customer_join.csv")
transactions.rdd.getNumPartitions()

7

In [12]:
transactions.filter(transactions.longitude.isNull()).count()

5548

In [4]:
import math
EARTH_RADIUS = 6371 * 1000.0 # metres

# lat long in radians

def haversine(lat1, long1, lat2, long2):

    lat1_rad, long1_rad = math.radians(lat1), math.radians(long1)
    lat2_rad, long2_rad = math.radians(lat2), math.radians(long2)

    delta_lat = lat2_rad - lat1_rad
    delta_long = long2_rad - long1_rad

    a = (math.sin(delta_lat / 2)**2) + (math.cos(lat1_rad) * math.cos(lat2_rad) * (math.sin(delta_long / 2)**2))
    c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    dist = EARTH_RADIUS * c

    return dist

In [21]:
import datetime
# example targets for business
target_age = 20
target_income = 50000.0
data_buckets = {}

def jsonDate2date(dj):

    date = datetime.datetime.strptime(dj, '%Y-%m-%dT%H:%M:%S.%fZ')
    return date

def floorDate(hour):
    # need to make new time ranges?
    if 6.0 <= hour <= 11.0:
    
        return (1, 0, 0) #'morning'
    
    elif 11.0 < hour <= 17.0:

        return (0, 1, 0) #'noon'

    elif 17.0 < hour < 22.0:

        return (0, 0, 1) #'evening'
    
    else:

        return (0, 0, 0)

# function called for each row in transaction rows
# customer row is correspondant to the specific transaction


def getDataForFood(lat, lon, row):
    
    (latitude,longitude,customerId,merchantName,currencyAmount,category,date,
     id,age,gender,totalIncome,relationshipStatus,pa_latitude,pa_longitude) = row

#     date = jsonDate2date(str())
    
    #month = getattr(date, 'month')
    
    day = date.day
    hour = date.hour
    
    if latitude is None or longitude is None:
        return (day, 0, 0, 0)
    
    distance = haversine(lat, lon, latitude, longitude)
    
    if abs(distance) > 500:
        return (day, 0, 0, 0)
    
    m, a, e = floorDate(hour)

    return (day, m, a, e)
        
    
    

In [22]:
lat = 43.7167324775
lon = -79.4562972547

volume = transactions.rdd.map(lambda row: getDataForFood(lat, lon, row)).collect()

In [23]:
morning = 0
afternoon = 0
evening = 0

for (d, m, a, e) in volume:
    morning += m
    afternoon += a
    evening += e
    
print("Morning:  {0}".format(morning))
print("Afternoon:  {0}".format(afternoon))
print("Evening:  {0}".format(evening))

Morning:  205
Afternoon:  117
Evening:  31
