In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cloudanum').getOrCreate()

In [2]:
stages = []
# Load training data
df = spark.read.csv('./datasets/taxi2.csv',inferSchema=True,header=True)

In [3]:
df.columns

['pickup_datetime',
 'dropoff_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count',
 'trip_distance',
 'payment_type',
 'fare_amount',
 'tip_amount',
 'tolls_amount',
 'total_amount']

In [4]:
df.createOrReplaceTempView("main")

In [6]:
data=spark.sql("SELECT payment_type, Count(*) AS COUNT,AVG(fare_amount), AVG(tip_amount) AS AverageFare from main GROUP BY payment_type")
data.show()

+------------+-----+------------------+-----------------+
|payment_type|COUNT|  avg(fare_amount)|      AverageFare|
+------------+-----+------------------+-----------------+
|         CRD|10000|32.384988999999784| 7.61713200000006|
|         Cas| 3080| 34.64730519480518|7.497457792207749|
+------------+-----+------------------+-----------------+



In [7]:
data=spark.sql("SELECT passenger_count,Count(*) AS COUNT,AVG(fare_amount), AVG(tip_amount) AS AverageFare from main GROUP BY passenger_count")
data.show()

+---------------+-----+------------------+-----------------+
|passenger_count|COUNT|  avg(fare_amount)|      AverageFare|
+---------------+-----+------------------+-----------------+
|              1| 8489| 33.24972199316738|7.647828955118439|
|              6|  519| 31.75356454720615|7.487167630057794|
|              3|  513| 32.66569200779726|7.573684210526312|
|              5| 1342|31.179731743666245|7.306810730253361|
|              4|  248|31.752016129032274|7.552782258064514|
|              2| 1968| 33.17144308943094|7.561534552845541|
|              0|    1|              70.0|             10.0|
+---------------+-----+------------------+-----------------+



In [8]:
from pyspark.ml.feature import StringIndexer

In [9]:
label_payment_type_indx = StringIndexer(inputCol = "payment_type", outputCol = "label")
indexed = label_payment_type_indx.fit(df).transform(df)

In [10]:
print([label_payment_type_indx])

[StringIndexer_f9fbbb568fe9]


In [11]:
data.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- COUNT: long (nullable = false)
 |-- avg(fare_amount): double (nullable = true)
 |-- AverageFare: double (nullable = true)



In [12]:
data.head()

Row(passenger_count=1, COUNT=8489, avg(fare_amount)=33.24972199316738, AverageFare=7.647828955118439)

In [13]:
data.columns

['passenger_count', 'COUNT', 'avg(fare_amount)', 'AverageFare']

In [14]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler,StringIndexer

In [15]:
paymentIndexer = StringIndexer(inputCol="payment_type", outputCol="payment_typeIndx").fit(df)

In [16]:
assembler = VectorAssembler(
  inputCols=[
 
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'passenger_count',
      'paymentIndexer'
],
              outputCol="features")