## Import packages

In [1]:
from pyspark.sql import Row
from pyspark.sql.types import FloatType, IntegerType
from pyspark.sql.functions import col
import sys
import boto3 
import pandas as pd
import numpy as np

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1543806882658_0003,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Read file and extract a map of required columns

In [85]:
def readfile(filename):
    # datafile="s3n://nyc-tlc/trip data/green_tripdata_2018-06.csv" # has area codes
    # datafile="s3n://nyc-tlc/trip data/yellow_tripdata_2015-07.csv" # has lat/long data
    rdd = sc.textFile(datafile).map(lambda line: line.split(",")).filter(lambda line: len(line)>1)
    col_names = rdd.take(1)[0]
    column_map = {}
    useful_columns = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "pickup_datetime", "dropoff_datetime", "dolocationid", "pulocationid", "fare_amount", "trip_distance", "passenger_count"]

    for i, c in enumerate(col_names):
        c = c.lower()
        for u in useful_columns:
            if c in u or u in c:
                column_map[u] = i

    print("Found attributes - {}".format(column_map.keys()))
    return rdd, column_map

VBox()

## Sanity check - all the required columns should be present

In [86]:
def sanityCheck(column_map):
    required_columns = ["pickup_datetime", "dropoff_datetime", "fare_amount", "trip_distance", "passenger_count"]
    for c in required_columns:
        if c not in column_map:
            print("Required column {} not found in the data. Exiting.".format(c))
            return False

    exit = False
    for c in ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"]:
        if c not in column_map:
            exit = True

    if exit:
        for c in ["dolocationid", "pulocationid"]:
            if c not in column_map:
                print("Required columns for location not found in the data. Exiting.".format(c))
                return False

    print("Sanity check complete. Data can be preprocessed.")
    return True


VBox()

## Create Dataframes from RDDs - cast them to proper type

In [87]:
def createDF(rdd, column_map):
    # remove headers
    header = rdd.first()
    rdd = rdd.filter(lambda line: line != header)
    
    # create df from rdd
    if 'dolocationid' in column_map:
        df = rdd.map(lambda line: Row(pulocationid=line[column_map['pulocationid']], 
                              dolocationid=line[column_map['dolocationid']], 
                              pickup_datetime=line[column_map['pickup_datetime']], 
                              dropoff_datetime=line[column_map['dropoff_datetime']],
                              trip_distance=line[column_map['trip_distance']], 
                              fare_amount=line[column_map['fare_amount']], 
                              passenger_count=line[column_map['passenger_count']])).toDF()
        
        # assign correct type for columns
        df = df.withColumn("dolocationid", df["dolocationid"].cast(IntegerType()))
        df = df.withColumn("fare_amount",df["fare_amount"].cast(FloatType()))
        df = df.withColumn("passenger_count", df["passenger_count"].cast(IntegerType()))
        df = df.withColumn("pulocationid", df["pulocationid"].cast(IntegerType()))
        df = df.withColumn("trip_distance", df["trip_distance"].cast(FloatType()))

    else:
        df = rdd.map(lambda line: Row(pickup_longitude=line[column_map['pickup_longitude']], 
                              pickup_latitude=line[column_map['pickup_latitude']], 
                              dropoff_longitude=line[column_map['dropoff_longitude']], 
                              dropoff_latitude=line[column_map['dropoff_latitude']], 
                              pickup_datetime=line[column_map['pickup_datetime']], 
                              dropoff_datetime=line[column_map['dropoff_datetime']], 
                              trip_distance=line[column_map['trip_distance']], 
                              fare_amount=line[column_map['fare_amount']], 
                              passenger_count=line[column_map['passenger_count']])).toDF()
        df = df.withColumn("dropoff_longitude", df["dropoff_longitude"].cast(FloatType()))
        df = df.withColumn("dropoff_latitude", df["dropoff_latitude"].cast(FloatType()))
        df = df.withColumn("fare_amount",df["fare_amount"].cast(FloatType()))
        df = df.withColumn("passenger_count", df["passenger_count"].cast(IntegerType()))
        df = df.withColumn("pickup_longitude", df["pickup_longitude"].cast(FloatType()))
        df = df.withColumn("pickup_latitude", df["pickup_latitude"].cast(FloatType()))
        df = df.withColumn("trip_distance", df["trip_distance"].cast(FloatType()))
    return df

VBox()

## Feature engineering - add zipcode

In [88]:
def addZipCode(df, column_map):
    if 'dolocationid' in column_map:
        from itertools import chain
        from pyspark.sql.functions import create_map, lit
        # this map has been generated separately, stored here for ease of access
        zipcodeMap = {1: 7114, 2: 11430, 3: 10469, 4: 10009, 5: 10309, 6: 10305, 7: 11101, 8: 11105, 9: 11358, 10: 11434, 11: 11214, 12: 10280, 13: 10280, 14: 11209, 15: 11359, 16: 11361, 17: 10506, 18: 10458, 19: 11426, 20: 14813, 21: 11206, 22: 11207, 23: 10314, 24: 12913, 25: 11201, 26: 11212, 27: 11697, 28: 11435, 29: 11235, 30: 11693, 31: 10453, 32: 10462, 33: 11201, 34: 11251, 35: 11212, 36: 11221, 37: 11237, 38: 11411, 39: 11234, 40: 11231, 41: 10026, 42: 10027, 43: 10019, 44: 10309, 45: 10013, 46: 10464, 47: 10457, 48: 10001, 49: 11205, 50: 10002, 51: 10475, 52: 11201, 53: 11356, 54: 11231, 55: 11224, 56: 11368, 57: 11368, 58: 10465, 59: 10457, 60: 10459, 61: 11238, 62: 11205, 63: 11208, 64: 11363, 65: 11201, 66: 11201, 67: 11228, 68: 10019, 69: 10451, 70: 11369, 71: 11203, 72: 11236, 73: 11355, 74: 10035, 75: 10029, 76: 10003, 77: 11207, 78: 10457, 79: 10211, 80: 11211, 81: 10466, 82: 11373, 83: 11378, 84: 10308, 85: 11226, 86: 11691, 87: 10005, 88: 10006, 89: 11226, 90: 10010, 91: 11239, 92: 11355, 93: 11368, 94: 10468, 95: 11375, 96: 11385, 97: 11205, 98: 11365, 99: 10312, 100: 10018, 101: 11004, 102: 11385, 103: 10012, 104: 10012, 105: 10012, 106: 11215, 107: 10016, 108: 11223, 109: 10308, 110: 10306, 111: 11232, 112: 11222, 113: 10012, 114: 10013, 115: 10301, 116: 10031, 117: 11692, 118: 10314, 119: 10452, 120: 10034, 121: 11366, 122: 11423, 123: 11229, 124: 11414, 125: 10006, 126: 10474, 127: 10034, 128: 10031, 129: 11372, 130: 11412, 131: 11423, 132: 11430, 133: 11218, 134: 11415, 135: 11367, 136: 10468, 137: 10016, 138: 11371, 139: 11413, 140: 10021, 141: 10065, 142: 10023, 143: 10024, 144: 10013, 145: 11101, 146: 11101, 147: 10459, 148: 10009, 149: 11201, 150: 11235, 151: 10025, 152: 10027, 153: 10463, 154: 11234, 155: 11234, 156: 10303, 157: 11378, 158: 10014, 159: 10456, 160: 11379, 161: 10018, 162: 10022, 163: 10018, 164: 10017, 165: 11230, 166: 10027, 167: 10456, 168: 10451, 169: 10453, 170: 10010, 171: 11354, 172: 10306, 173: 11368, 174: 10467, 175: 11364, 176: 10306, 177: 11233, 178: 11230, 179: 11103, 180: 11416, 181: 11215, 182: 10462, 183: 10461, 184: 10461, 185: 10461, 186: 10016, 187: 10302, 188: 11225, 189: 11238, 190: 11215, 191: 11427, 192: 11355, 193: 11101, 194: 10035, 195: 11231, 196: 11374, 197: 11418, 198: 11385, 199: 11370, 200: 10471, 201: 11694, 202: 10044, 203: 11422, 204: 10309, 205: 11412, 206: 11378, 207: 11370, 208: 10469, 209: 10013, 210: 11235, 211: 10013, 212: 10472, 213: 10472, 214: 10305, 215: 11435, 216: 11420, 217: 11221, 218: 11413, 219: 11413, 220: 10463, 221: 10304, 222: 11239, 223: 11105, 224: 10009, 225: 11205, 226: 11104, 227: 11211, 228: 11232, 229: 10022, 230: 10036, 231: 10007, 232: 10002, 233: 10022, 234: 10003, 235: 10453, 236: 10021, 237: 10028, 238: 10044, 239: 10065, 240: 10463, 241: 10463, 242: 10461, 243: 10032, 244: 10034, 245: 10310, 246: 10036, 247: 10451, 248: 10460, 249: 10014, 250: 10462, 251: 10314, 252: 11357, 253: 11357, 254: 10467, 255: 10467, 256: 10467, 257: 11215, 258: 11421, 259: 10470, 260: 11377, 261: 10048, 262: 10028, 263: 10028, 264: '', 265: ''}
        mapping_expr = create_map([lit(x) for x in chain(*zipcodeMap.items())])
        df = df.withColumn("dropoff_zipcode", mapping_expr[df["dolocationid"]])
        df = df.withColumn("pickup_zipcode", mapping_expr[df["pulocationid"]])
        df = df.drop("dolocationid", "pulocationid")
    else:
        from uszipcode import SearchEngine
        from uszipcode import Zipcode
        from pyspark.sql.functions import udf, array
        import numpy as np

        search = SearchEngine(simple_zipcode=True)
        def get_zip_code(latitude,longitude):
            try:
                search = SearchEngine(simple_zipcode=True)
                result = search.by_coordinates(latitude, longitude, radius=5, returns=1)
                return result[0].zipcode
            except ValueError as e:
                return 10001

        for index, row in df.iterrows():
            lat = row['pickup_latitude']
            lng = row['pickup_longitude']
            zipcode = get_zip_code(lat,lng)
            df.at[index,'zipcode'] = zipcode.zipcode
    return df

VBox()

## Remove NaN values

In [89]:
def remove_nan_values(df):
    num_columns = set(['dolocationid', 'fare_amount', 'passenger_count', 'pulocationid', 'trip_distance'])
    for column in df.columns:
        df = df.filter(col(column).isNotNull())
        if column in num_columns:
            df = df.filter(col(column) > 0.0)
            print str(column)+ "-->" + str(df.count())
    return df

VBox()

## Remove invalid latitude/longitude

In [90]:
def remove_invalid_lat_long(df):
    max_lat = 40.917577
    min_lat = 40.477399 
    max_long = -73.700272 
    min_long = -74.259090
    loc_cols = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"]
    print loc_cols
    lat_cols = []
    long_cols = []
    for column in loc_cols:
        if "latitude" in column.lower():
            lat_cols.append(column)
        elif "longitude" in column.lower():
            long_cols.append(column)
#     print lat_cols
#     print long_cols
            
    for column in lat_cols:
        df = df.filter(col(column).between(min_lat, max_lat))
#         print "lat : "+ str(column) + "-->" + str(df.count())
#         print ("lat : {} ---> {}".format(column, df.count()))
    for column in long_cols:
        df = df.filter(col(column).between(min_long, max_long))
#         print ("long : {} ---> {}".format(column, df.count()))
    return df

VBox()

## Remove trips with invalid fare

In [91]:
def remove_invalid_fare_trips(df):
    df = df.filter(col('fare_amount') >= 2.5)
    return df

VBox()

## Handler function date preprocessing

In [92]:
nPartitions = 2 # set this to the number of workers you have
outputDir = "s3n://dic-taxi-fare-prediction/data"
def handler(filenames):
    for filename in filenames:
        try:
            rdd, column_map = readfile(filename)
            if not sanityCheck(column_map):
                continue
            df1 = createDF(rdd, column_map)
            df2 = addZipCode(df1, column_map)
            df3 = remove_nan_values(df2)
            if "pickup_longitude" in column_map:
                df3 = remove_invalid_lat_long(df3)
            df4 = remove_invalid_fare_trips(df3)
            df4.write.save(outputDir, format='csv', header=True)
        except: # until the lat/long logic is fixed
            continue
    return [1]
#     newRdd = df3.rdd.map(list)
#     newRdd.saveAsTextFile("s3n://dic-taxi-fare-prediction/data")

# newRdd = handler(rdd)

VBox()

## Read files names to be processed

In [93]:
bucket = 'nyc-tlc'
prefix = "trip data"
trip_type = "green_tripdata"

s3 = boto3.client("s3")
objects = s3.list_objects(Bucket=bucket, Prefix=prefix)
tripdata_files = []
# print(objects["ResponseMetadata"]["HTTPStatusCode"])
if objects["ResponseMetadata"] and objects["ResponseMetadata"]["HTTPStatusCode"] == 200:
    for obj in objects['Contents']:
        if "Key" in obj and trip_type in obj["Key"]:
            tripdata_files.append(obj["Key"])
    print("Found {} under path s3://{}/{} for {}".format(len(tripdata_files), bucket, prefix, trip_type))
else:
    print("Could not read files under s3://{}/{}. Exiting.".format(bucket, prefix))
    sys.exit(1)

VBox()

Found 59 under path s3://nyc-tlc/trip data for green_tripdata

## Parallelize file processing 

In [94]:
# handler(rdd, column_map)
def test(filenames):
    import socket
    import boto3
    import random
    hostname = socket.gethostname()
    bucket = 'atambol'
    s3 = boto3.client("s3")
    for filename in filenames:
        key = "/".join([hostname, filename, str(random.randint(1,10001))])
        s3.put_object(Bucket=bucket, Key=key, Body=str.encode("h"))
        
    return [1]
        
# sc.parallelize(tripdata_files, nPartitions).map(lambda element: test(element)).collect()
''' this works on one node with orrect file count
rdd = sc.parallelize(tripdata_files, 2)
# rdd.count()
rdd.mapPartitions(test).count()
# dir(rdd)
'''
rdd = sc.parallelize(tripdata_files, 16) # 16 = number total cores of workers in the cluster

VBox()

In [95]:
# rdd.count()
rdd.mapPartitions(handler).count()
# rdd.getNumPartitions()
# dir(rdd)
# print("Default parallelism: {}".format(sc.defaultParallelism))
# print("Number of partitions: {}".format(rdd.getNumPartitions()))
# print("Partitioner: {}".format(rdd.partitioner))
# print("Partitions structure: {}".format(rdd.glom().collect()))

VBox()

Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1053, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1044, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 915, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 814, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2472, in _jrdd
    self._jrdd_deseri

In [59]:
tripdata_files


VBox()

An error was encountered:
Session 1 did not reach idle status in time. Current status is busy.
