In [0]:
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.pandas as ps
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window


def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)


from pyspark.sql.functions import monotonically_increasing_id
def get_mode(df):
    column_lst = df.columns
    res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
    df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
    
    for i in range(1, len(res)):
        df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
        df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
        
    return df_mode.drop("temp_name_monotonically_increasing_id")





spark = SparkSession.builder \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()
parquets=["/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_01.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_02.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_03.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_04.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_05.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_06.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_07.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_08.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_09.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_10.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_11.parquet",
          "/FileStore/tables/yellow_tripdata_2019/yellow_tripdata_2019_12.parquet",]
    
data=[]
for i in parquets:
    d = spark.read.option("header","true").parquet(i,inferSchema=True)
    data.append(d)
    
#Merge all the dataframes in list
df_complete=unionAll(*data)
print("Total Rows and Columns:",(df_complete.count(), len(df_complete.columns)))
columns=df_complete.columns

print("=============================================")

#Null values
print("Null values for every columns in DataFrame:")
print("=============================================")
for i in df_complete.columns:
      print(i,df_complete.count()-(df_complete.na.drop(subset=i).count()))

print("=============================================")


#Mode values for the DataFrame
mode_values=get_mode(df_complete)


Total Rows and Columns: (84598444, 19)
Null values for every columns in DataFrame:
VendorID 0
tpep_pickup_datetime 0
tpep_dropoff_datetime 0
passenger_count 444383
trip_distance 0
RatecodeID 444383
store_and_fwd_flag 444383
PULocationID 0
DOLocationID 0
payment_type 0
fare_amount 0
extra 0
mta_tax 0
tip_amount 0
tolls_amount 0
improvement_surcharge 0
total_amount 0
congestion_surcharge 5300601
airport_fee 84598444


In [0]:
print("Mode value of each column")
print("================================")
for i in range(len(df_complete.columns)):
    print(columns[i],":",mode_values.collect()[0][i])
print("================================")
# print("Grouping by each column")
# print("================================")
# for i in df_complete.columns:
#     display(df_complete.groupBy(i).count())

Mode value of each column
VendorID : 2
tpep_pickup_datetime : 2019-02-02 12:19:58
tpep_dropoff_datetime : 2019-10-27 00:00:00
passenger_count : 1.0
trip_distance : 0.9
RatecodeID : 1.0
store_and_fwd_flag : N
PULocationID : 237
DOLocationID : 236
payment_type : 1
fare_amount : 6.0
extra : 0.0
mta_tax : 0.5
tip_amount : 0.0
tolls_amount : 0.0
improvement_surcharge : 0.3
total_amount : 9.8
congestion_surcharge : 2.5
airport_fee : None


In [0]:
import pyspark.pandas as ps
pd=ps.DataFrame(df_complete)
df_corr=pd.to_spark()
df_corr=df_corr.withColumn("date_format",to_date(df_complete.tpep_pickup_datetime,"MM-dd-yyyy"))
df_corr=df_corr.where(df_corr.date_format.contains("2019"))


In [0]:
week_level=df_corr.drop('RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',)
week_level=week_level.withColumn("week_date",date_trunc("week",week_level.date_format))
week_level=week_level.fillna({"passenger_count":1.0,
                   "congestion_surcharge":2.5,  
                  })
week_level.columns

Out[79]: ['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'date_format',
 'week_date']

In [0]:
g=week_level.groupBy("VendorID","week_date").sum()

In [0]:

#I made a little helper function for this that might help some people out.

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

In [0]:
final=rename_cols(g)
final.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- week_dat: timestamp (nullable = true)
 |-- sum_VendorID: long (nullable = true)
 |-- sum_passenger_count: double (nullable = true)
 |-- sum_trip_distance: double (nullable = true)
 |-- sum_payment_type: long (nullable = true)
 |-- sum_fare_amount: double (nullable = true)
 |-- sum_extra: double (nullable = true)
 |-- sum_mta_tax: double (nullable = true)
 |-- sum_tip_amount: double (nullable = true)
 |-- sum_tolls_amount: double (nullable = true)
 |-- sum_improvement_surcharge: double (nullable = true)
 |-- sum_total_amount: double (nullable = true)
 |-- sum_congestion_surcharge: double (nullable = true)
 |-- sum_airport_fee: long (nullable = true)



In [0]:
display(final.drop("sum_VendorID","sum_payment_type","sum_passenger_count"))

VendorID,week_dat,sum_trip_distance,sum_fare_amount,sum_extra,sum_mta_tax,sum_tip_amount,sum_tolls_amount,sum_improvement_surcharge,sum_total_amount,sum_congestion_surcharge,sum_airport_fee
2,2019-07-01T00:00:00.000+0000,2461730.089999984,10179884.039999347,248428.19999999955,365968.08,1514735.6400007517,320467.0699998407,221599.4999973851,14505052.030086288,1666937.5,
2,2019-04-08T00:00:00.000+0000,3498924.779999965,15127964.06999761,410833.1999999993,554335.0,2521627.5199999763,461549.429999742,334716.2999929954,21955194.24021995,2559571.75,
2,2019-08-12T00:00:00.000+0000,2873457.2200000286,12204705.759999458,322508.49999999884,439060.5,1955759.960001432,375906.39999980485,265385.39999568585,17572912.270141356,2024428.0,
4,2019-01-07T00:00:00.000+0000,48263.73000000018,218647.8,6510.5,9510.5,34176.7299999999,5661.070000000119,5712.000000002003,280218.59999995114,47602.5,
2,2018-12-31T00:00:00.000+0000,2512871.7500000363,10244454.929998403,241815.4499999996,391804.5,1370340.9000002276,292825.5000001862,236439.5999968088,12782332.770064164,1977977.5,
1,2019-01-21T00:00:00.000+0000,1775732.800000063,8412657.509999966,208804.23,336837.77,1223623.869999883,188906.84000003504,202940.6999981087,10573770.92001498,20.0,
2,2019-06-10T00:00:00.000+0000,3325154.929999864,14302101.199999416,360205.8099999996,510667.5,2389086.2100005154,450218.47999975074,308552.69999401056,20662555.390195496,2352225.0,
5,2019-01-21T00:00:00.000+0000,75.29,279.89000000000004,0.0,8.0,31.64,11.52,4.799999999999999,335.85,40.0,
1,2019-01-14T00:00:00.000+0000,1774991.7999999742,8089842.949999959,229747.7,338499.8,1220704.989999828,196671.95000004183,203943.59999806975,10279410.990012333,1699120.0,
4,2019-01-21T00:00:00.000+0000,40690.14999999991,187731.49,5437.0,8063.5,29358.499999999764,4795.850000000087,4844.700000001476,240195.2399999653,40372.5,


In [0]:
month_level=df_corr.drop('RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',)
month_level=month_level.withColumn("Month",month(month_level.date_format))
month_level=month_level.fillna({"passenger_count":1.0,
                   "congestion_surcharge":2.5,  
                  })
month_level.columns

Out[78]: ['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'date_format',
 'Month']

In [0]:
f=month_level.groupBy("VendorID","Month").sum()
final2=rename_cols(f)
final2.printSchema()


root
 |-- VendorID: long (nullable = true)
 |-- Mont: integer (nullable = true)
 |-- sum_VendorID: long (nullable = true)
 |-- sum_passenger_count: double (nullable = true)
 |-- sum_trip_distance: double (nullable = true)
 |-- sum_payment_type: long (nullable = true)
 |-- sum_fare_amount: double (nullable = true)
 |-- sum_extra: double (nullable = true)
 |-- sum_mta_tax: double (nullable = true)
 |-- sum_tip_amount: double (nullable = true)
 |-- sum_tolls_amount: double (nullable = true)
 |-- sum_improvement_surcharge: double (nullable = true)
 |-- sum_total_amount: double (nullable = true)
 |-- sum_congestion_surcharge: double (nullable = true)
 |-- sum_airport_fee: long (nullable = true)
 |-- sum_Month: long (nullable = true)



In [0]:
display(final2.drop("sum_VendorID","sum_payment_type","sum_Month","sum_passenger_count"))

VendorID,Mont,sum_trip_distance,sum_fare_amount,sum_extra,sum_mta_tax,sum_tip_amount,sum_tolls_amount,sum_improvement_surcharge,sum_total_amount,sum_congestion_surcharge,sum_airport_fee
2,8,12825559.939997597,54322064.77000523,1428884.7000000172,1932889.75,8681133.410006532,1727795.660010087,1168148.7000855682,78080881.76741132,8880404.0,
5,1,285.6800000000001,1245.6,0.0,49.5,220.17000000000004,46.16,29.70000000000005,1410.5099999999998,247.5,
1,1,7738709.10000006,36071384.92000035,973381.2,1463008.1600000004,5200263.300002536,828683.6500009553,881517.6000410882,45418238.829568855,4642730.0,
2,6,14134425.479996435,61357935.29000942,1527938.3600000106,2175797.5,10161066.53001961,1923840.9900131545,1314760.2001083218,88443276.86681466,10025428.25,
2,9,13610311.489996975,59751313.04000386,1527361.3500000222,2104219.2,9931022.890016943,1822876.660011463,1271598.6001016262,86108856.32705237,9762034.5,
2,7,13070629.219998509,56209058.130004734,1475661.1000000157,2002161.01,9056172.78000944,1738726.3600103313,1211000.7000922195,80861290.94732818,9224474.5,
2,3,15421740.449998418,65959548.55002728,1702020.8000000145,2414141.25,10944588.630031554,1958690.0600024883,1459723.2001308238,95443507.5362394,11058970.25,
2,5,15108384.169995522,66065186.59002966,1721331.1000000108,2364913.0,10980074.16002508,2069197.230015553,1428489.9001259676,95505232.48636,10921443.0,
2,2,13237018.359997438,56998947.870031945,1509601.10000001,2146157.5,9499749.37000949,1632507.6900019788,1296768.600105523,82457820.47714151,9424652.25,
2,1,13846184.569998171,59471312.71003062,1596370.0500000096,2319821.25,8675271.550004618,1633460.1400019906,1399253.4001214372,75126040.03727005,7398427.75,


In [0]:
avg_month=df_corr.drop('RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',)
avg_month=avg_month.withColumn("Month",month(month_level.date_format))
avg_month=avg_month.fillna({"passenger_count":1.0,
                   "congestion_surcharge":2.5,  
                  })
avg_month.columns
avg_month.groupby("VendorID","Month").agg({"congestion_surcharge":"avg"}).alias('Avg_congestion_surcharge').orderBy("Month", ascending=False).show(100)
  

+--------+-----+-------------------------+
|VendorID|Month|avg(congestion_surcharge)|
+--------+-----+-------------------------+
|       5|   12|                      2.5|
|       1|   12|        2.301466098925982|
|       2|   12|       2.2898577555082156|
|       2|   11|       2.3007396702511227|
|       5|   11|                      2.5|
|       1|   11|       2.3094858841879424|
|       1|   10|       2.3137281958316755|
|       2|   10|       2.2967218808453715|
|       5|   10|                      2.5|
|       5|    9|                      2.5|
|       2|    9|        2.284518289420906|
|       1|    9|        2.295024391221862|
|       4|    9|       1.6346153846153846|
|       4|    8|       1.8181818181818181|
|       2|    8|       2.2617148570842067|
|       5|    8|                      2.5|
|       1|    8|       2.2786958490650426|
|       2|    7|        2.268817562656751|
|       4|    7|        2.326229977116705|
|       5|    7|                      2.5|
|       1| 

In [0]:
pass_month=df_corr.drop('RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',)
pass_month=pass_month.withColumn("Month",month(month_level.date_format))
pass_month=pass_month.fillna({"passenger_count":1.0,
                   "congestion_surcharge":2.5,  
                  })
pass_month.columns
pass_month.groupby("VendorID","Month").agg({"passenger_count":"sum"}).alias('Total_passenger_count').orderBy("Month").show(100)

+--------+-----+--------------------+
|VendorID|Month|sum(passenger_count)|
+--------+-----+--------------------+
|       2|    1|           8428590.0|
|       4|    1|             78704.0|
|       5|    1|                99.0|
|       1|    1|           3536755.0|
|       1|    2|           3198458.0|
|       4|    2|             52934.0|
|       5|    2|                68.0|
|       2|    2|           7807682.0|
|       1|    3|           3573563.0|
|       2|    3|           8754361.0|
|       5|    3|                46.0|
|       4|    3|             43528.0|
|       1|    4|           3411199.0|
|       4|    4|             33126.0|
|       2|    4|           8292821.0|
|       5|    4|                17.0|
|       4|    5|             29740.0|
|       1|    5|           3408856.0|
|       5|    5|                10.0|
|       2|    5|           8467842.0|
|       2|    6|           7737157.0|
|       1|    6|           3144416.0|
|       5|    6|                 5.0|
|       4|  

In [0]:
mode=[]
for i in range(len(df_complete.columns)): 
    mode.append(mode_values.collect()[0][i])
    

In [0]:
check=zip(df_complete.columns,mode)
mydict=dict(check)
a_dict = {key: mydict[key] for key in mydict if (key != 'VendorID' and key !='tpep_pickup_datetime' and key !='tpep_dropoff_datetime')}

In [0]:
a_dict 

Out[70]: {'passenger_count': 1.0,
 'trip_distance': 0.9,
 'RatecodeID': 1.0,
 'store_and_fwd_flag': 'N',
 'PULocationID': 237,
 'DOLocationID': 236,
 'payment_type': 1,
 'fare_amount': 6.0,
 'extra': 0.0,
 'mta_tax': 0.5,
 'tip_amount': 0.0,
 'tolls_amount': 0.0,
 'improvement_surcharge': 0.3,
 'total_amount': 9.8,
 'congestion_surcharge': 2.5,
 'airport_fee': None}