# Random Forest

## Spark

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1280px-Apache_Spark_logo.svg.png" width="400">

**Hardware**: 20 nodes, r5.2xlarge (8 CPU, 64 GB RAM)

# Load data

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("pyspark-rf-benchmark")
         .master("spark://ecs-python2:7077")
         .master("spark://10.50.0.194:7077")
         .config('spark.executor.memory', '16G')
         .config('spark.driver.memory', '16G')
         .config('spark.driver.maxResultSize', '16G')
         .getOrCreate())

spark

22/12/29 00:55:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
#import s3fs
import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

In [3]:
# manually specify schema because inferSchema in read.csv is quite slow
schema = StructType([
    StructField('VendorID', DoubleType()),
    StructField('tpep_pickup_datetime', TimestampType()),
    StructField('tpep_dropoff_datetime', TimestampType()),
    StructField('passenger_count', DoubleType()),
    StructField('trip_distance', DoubleType()),
    StructField('RateCodeID', DoubleType()),
    StructField('store_and_fwd_flag', StringType()),
    #StructField('PULocationID', DoubleType()),
    #StructField('DOLocationID', DoubleType()),
    StructField('pickup_longitude', DoubleType()),
    StructField('pickup_latitude', DoubleType()), 
    StructField('dropoff_longitude', DoubleType()), 
    StructField('dropoff_latitude', DoubleType()),
    StructField('payment_type', DoubleType()),
    StructField('fare_amount', DoubleType()),
    StructField('extra', DoubleType()),
    StructField('mta_tax', DoubleType()),
    StructField('tip_amount', DoubleType()),
    StructField('tolls_amount', DoubleType()),
    StructField('improvement_surcharge', DoubleType()),
    StructField('total_amount', DoubleType()),
    StructField('congestion_surcharge', DoubleType()),
])

In [4]:
#path = "/rapids/notebooks/host/dataset/nyc-taxi/yellow_tripdata_2015.parquet"
path = "/home/cloud/dataset/nyc-taxi/yellow_tripdata_2015.parquet"
df = spark.read.parquet(path)

                                                                                

In [5]:
%%time
taxi=df
print(f"{taxi.count(): }")




 146112989
CPU times: user 5.14 ms, sys: 900 µs, total: 6.04 ms
Wall time: 4.43 s


                                                                                

# Feature engineering

In [6]:
taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))
taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))
taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))
# Spark ML expects "label" column for dependent variable
taxi = taxi.withColumn('label', taxi.total_amount)  
taxi = taxi.fillna(-1)

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
            'pickup_week_hour', 'passenger_count', 'VendorID', 
            'RateCodeID', 'store_and_fwd_flag', 'pickup_longitude', 'pickup_latitude', 
            'dropoff_longitude', 'dropoff_latitude']

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features',
)

pipeline = Pipeline(stages=[assembler])

In [8]:
%%time
assembler_fitted = pipeline.fit(taxi)
X = assembler_fitted.transform(taxi)

X.cache()
X.count()

22/12/29 00:55:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

CPU times: user 26.1 ms, sys: 879 µs, total: 27 ms
Wall time: 38.8 s


                                                                                

146112989

# Train random forest!

In [9]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)

In [10]:
%%time
fitted = rf.fit(X)

22/12/29 01:05:24 WARN DAGScheduler: Broadcasting large task binary with size 1049.3 KiB
22/12/29 01:08:16 WARN DAGScheduler: Broadcasting large task binary with size 2024.5 KiB
22/12/29 01:11:25 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
22/12/29 01:14:41 WARN DAGScheduler: Broadcasting large task binary with size 1242.6 KiB
22/12/29 01:14:43 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
22/12/29 01:18:35 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/12/29 01:18:37 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB
22/12/29 01:23:26 WARN DAGScheduler: Broadcasting large task binary with size 4.8 MiB
                                                                                

CPU times: user 361 ms, sys: 145 ms, total: 506 ms
Wall time: 26min 56s


In [12]:
sc.close()

NameError: name 'sc' is not defined