In [None]:
pyspark --master yarn --num-executors 2 --executor-memory 4G --packages com.databricks:spark-csv_2.10:1.5.0 --conf spark.ui.port=10308


In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Row	
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [None]:
# daily weather averages
df = sqlContext.sql('select * , date(to_date(datetime)) date from kholkolg.weather')
avgs = df.groupBy("date").agg({'temperature': 'mean', 'humidity': 'mean',\
                               'pressure': 'mean', 'wind_speed': 'mean'}).orderBy('date')

avgs.write.format('com.databricks.spark.csv').save('user/kholkolg/nyc_results/weather_daily_means')


In [None]:
# trips
timeFmt = "yyyy-MM-dd HH:mm:ss.SSSSSS"
kmPerMile = 1.609344 
timeDiff = (F.unix_timestamp('dropoff_datetime', format=timeFmt)
            - F.unix_timestamp('pickup_datetime', format=timeFmt))

df = sqlContext.sql('''select count, pickup_datetime, dropoff_datetime, trip_distance from  kholkolg.nyc2015_trips ''')
df = df.dropna()
#raw data contains 146.112.989 entries 

#filter entries with dropoff time before pickup, zero length, or zero passengers
df = df.filter(df.pickup_datetime <= df.dropoff_datetime)
df = df.filter('count > 0')
df = df.filter('trip_distance > 0')

#compute trip duration in seconds
df = df.withColumn('duration', timeDiff)
#compute trip distance in meters, and speed in m/s
df = df.withColumn('trip_distance', df['trip_distance']*kmPerMile*1000)
df = df.withColumn('speed', df['trip_distance']/df['duration'])
#remove bad speeds (40 m/s is smth around 130 km/h)
df = df.filter('speed <= 40')
#add day of year and  day of week
df = df.withColumn('day_of_year', F.dayofyear('pickup_datetime'))
df = df.withColumn('day_of_week', F.date_format('pickup_datetime', 'EEEE'))
# add hour column
df = df.withColumn("hour", F.hour(F.unix_timestamp("pickup_datetime").cast("timestamp")))

df.cache()

In [None]:
# daily sums: passenger count, trip count, and total travelled distance
sums = df.groupBy('day_of_year').agg(F.count('*').alias('trip_count'),\
                                     F.sum('count').alias('passenger_count'),\
                                     F.sum('trip_distance').alias('total_distance')).orderBy('day_of_year')
sums.write.format('com.databricks.spark.csv').save('/user/kholkolg/nyc_results/trip_daily_sums')

In [None]:
#trips by passenger count
passengers = df.groupBy('day_of_year', 'count').agg(F.count('*').alias('trip_count')).orderBy('day_of_year')
passengers.write.format('com.databricks.spark.csv').save('/user/kholkolg/nyc_results/daily_passengers')


In [None]:
# trip counts, total duration, and average speed by weekday and daytime
weekdayDF = df.groupBy('day_of_week', 'hour').agg(F.count('*').alias('trip_counts'),\
                                                  F.sum('duration').alias('total_duration'),\
                                                  F.avg('speed').alias('average_speed')).\
orderBy('day_of_week', 'hour')
weekdayDF.write.format('com.databricks.spark.csv').save('/user/kholkolg/nyc_results/weekdays_stats')

In [None]:
## Histograms
num_bins = 50
max_passenger = 10

In [None]:
# distance 
distance_hist = df.select('trip_distance').rdd.flatMap(lambda x: x).histogram(num_bins)
print(distance_hist)

df3km = df.filter('trip_distance <= 3000')
distance_hist3km = df3km.select('trip_distance').rdd.flatMap(lambda x:x).histogram(num_bins)
print(distance_hist3km)

In [None]:
# trip duration
duration_hist = df.select('duration').rdd.flatMap(lambda x: x).histogram(num_bins)
print(duration_hist)

df30min = df.filter('duration <= 1800')
duration_hist30min = df30min.select('duration').rdd.flatMap(lambda x: x).histogram(num_bins)
print(duration_df30min)


In [None]:
# average speed                                             
speed_hist = df.select('speed').rdd.flatMap(lambda x: x).histogram(num_bins)
print(speed_hist)
#max speed around 40 km/h in m/s
df40kmh = df.filter('speed <= 11.2')
speed_40hist = df40kmh.select('speed').rdd.flatMap(lambda x: x).histogram(num_bins)
print(speed_40hist)

In [None]:
# daily counts histograms 
daily_counts_hist = sums.select('trip_count').rdd.flatMap(lambda x: x).histogram(num_bins)
print(daily_counts_hist)

In [None]:
#copy results from local terminal
scp -r kholkolg@hador.ics.muni.cz:~/nyc_results  /home/olga/Documents/BDT/results